class FluxScriptAdapter(SchedulerScriptAdapter):
"""Interface class for the flux scheduler (on Spectrum MPI)."""
key = "flux"
def __init__(self, **kwargs):
"""
Initialize an instance of the FluxScriptAdapter.
The FluxScriptAdapter is this package interface to the Flux
scheduler. This adapter constructs Flux scripts for a StudyStep based
on user set defaults and local settings present in each step.
The expected keyword arguments that are expected when the Flux adapter
is instantiated are as follows:
* host: The cluster to execute scripts on.
* bank: The account to charge computing time to.
* queue: Scheduler queue scripts should be submitted to.
* nodes: The number of compute nodes to be reserved for computing.
:param **kwargs: A dictionary with default settings for the adapter.
"""
super(FluxScriptAdapter, self).__init__(**kwargs)
# Store the interface we're using
_version = kwargs.pop("version", FluxFactory.latest)
self.add_batch_parameter("version", _version)
self._interface = FluxFactory.get_interface(_version)
# Note, should we also log parsed 'base version' used when comparing
# the adaptor/broker versions along with the raw string we get back
# from flux.
self._broker_version = self._interface.get_flux_version()
uri = kwargs.pop("uri", None)
if not uri: # Check if flux uri env var is set, log if so
uri = os.environ.get("FLUX_URI", None)
if uri:
LOGGER.info(f"Found FLUX_URI in environment, scheduling jobs to broker uri {uri}")
else:
LOGGER.info(f"No FLUX_URI; scheduling standalone batch job to root instance")
else:
LOGGER.info(f"Using FLUX_URI found in study specification: {uri}")
# if not uri:
# raise ValueError(
# "Flux URI must be specified in batch or stored in the "
# "environment under 'FLUX_URI'")
# NOTE: Host doesn"t seem to matter for FLUX. sbatch assumes that the
# current host is where submission occurs.
self._allocation_args = kwargs.get("allocation_args", {})
LOGGER.info(f"Allocation args: {self._allocation_args}")
self._launcher_args = kwargs.get("launcher_args", {})
self._addl_args = kwargs.get("args", {})
# Normalize the allocation and launcher args to a flat dict, removing dotpath encoding
# TODO: workon a validation api that can be hooked up to yamlspec ingestion machinery for
# early error messaging
if self._allocation_args:
self._allocation_args = self._interface.normalize_additional_args(
self._allocation_args,
group_name="allocation",
filter_unknown=True
)
if self._launcher_args:
self._launcher_args = self._interface.normalize_additional_args(self._launcher_args)
# Default queue/bank to "" such that 'truthiness' can exclude them
# from the jobspec/scripts if not provided
queue = kwargs.pop("queue", "")
bank = kwargs.pop("bank", "")
available_queues = self._interface.get_broker_queues()
# Ignore queue if specified and we detect broker only has anonymous queue
if not available_queues and queue:
LOGGER.info(
"Flux Broker '%s' only has an anonymous queue: "
"ignoring batch setting '%s'",
uri,
queue,
)
queue = ""
self.add_batch_parameter("queue", queue)
self.add_batch_parameter("bank", bank)
# Check for the flag in additional args and pop it off, letting step key win later
# NOTE: only need presence of key as this mimics cli like flag behavior
# TODO: promote this to formally supported behavior for all adapters, push down into interfaces?
exclusive_keys = ['x', 'exclusive']
self._exclusive = {"allocation": False, "launcher": False}
if all(ekey in self._allocation_args for ekey in exclusive_keys):
LOGGER.warn("Repeated addition of exclusive flags 'x' and 'exclusive' in allocation_args.")
alloc_eflags = [self._allocation_args.pop(ekey) for ekey in exclusive_keys if ekey in self._allocation_args]
if alloc_eflags:
self._exclusive['allocation'] = True
if all(ekey in self._launcher_args for ekey in exclusive_keys):
LOGGER.warn("Repeated addition of exclusive flags 'x' and 'exclusive' in launcher_args.")
launcher_eflags = [self._launcher_args.pop(ekey) for ekey in exclusive_keys if ekey in self._launcher_args]
if launcher_eflags:
self._exclusive['launcher'] = True
self.add_batch_parameter("exclusive", self._exclusive['allocation'])
# Populate formally supported flux directives in the header
self._flux_directive = "#flux: "
self._header = {
"nodes": f"{self._flux_directive}" + "-N {nodes}",
"procs": f"{self._flux_directive}" + "-n {procs}",
# NOTE: always use seconds to guard against upstream default behavior changes
"walltime": f"{self._flux_directive}" + "-t {walltime}s",
"queue": f"{self._flux_directive}" + "-q {queue}",
"bank": f"{self._flux_directive}" + "--bank={bank}",
"job-name":
f"{self._flux_directive}" + "--job-name=\"{job-name}\"\n"
f"{self._flux_directive}" + "--output={job-name}.{{{{id}}}}.out\n"
f"{self._flux_directive}" + "--error={job-name}.{{{{id}}}}.err"
}
self._cmd_flags = {
"ntasks": "-n",
"nodes": "-N",
}
self._extension = "flux.sh"
self.h = None
# Addition info flags to add to the header: MAESTRO only! flux ignores
# anything after first non-flux-directive line so this must go last
self._info_directive = "#MAESTRO-INFO "
self._header_info = {
"version": f"{self._info_directive}" + "(flux adapter version) {version}",
"flux_version": f"{self._info_directive}" + "(flux version) {flux_version}"
}
if uri:
self.add_batch_parameter("flux_uri", uri)
self._header_info['flux_uri'] = f"{self._info_directive}" + "(flux_uri) {flux_uri}"
@property
def extension(self):
return self._extension
def _convert_walltime_to_seconds(self, walltime):
if isinstance(walltime, int) or isinstance(walltime, float):
LOGGER.debug("Encountered numeric walltime = %s", str(walltime))
return int(float(walltime) * 60.0)
elif isinstance(walltime, str) and walltime.isnumeric():
LOGGER.debug("Encountered numeric walltime = %s", str(walltime))
return int(float(walltime) * 60.0)
elif ":" in walltime:
# Convert walltime to seconds.
LOGGER.debug("Converting %s to seconds...", walltime)
seconds = 0.0
for i, value in enumerate(walltime.split(":")[::-1]):
seconds += float(value) * (60.0 ** i)
return seconds
elif not walltime or (isinstance(walltime, str) and walltime == "inf"):
return 0
else:
msg = \
f"Walltime value '{walltime}' is not an integer or colon-" \
f"separated string."
LOGGER.error(msg)
raise ValueError(msg)
def pack_addtl_batch_args(self):
"""
Normalize the allocation args and pack up into the interface specific
groups that have assocated jobspec methods, e.g. conf, setattr, setopt.
:return: dictionary of allocation arg groups to attach to jobspecs
"""
addtl_batch_args = {
arg_type: {}
for arg_type in self._interface.addtl_alloc_arg_types()
}
for arg_key, arg_values in self._allocation_args.items():
# TODO: move this into a validation function for pre launch
# batch args validation
arg_type = self._interface.addtl_alloc_arg_type_map(arg_key)
if arg_type is None:
# NO WARNINGS HERE: args in 'misc' type handled elsewhere
# TODO: add better mechanism for tracking whicn args
# actually get used; dicts can'ppt do this..
continue
new_arg_values = arg_values
# Match default of flag types in flux cli.
# see https://github.com/flux-framework/flux-core/blob/a3860d4dea5b5a17c473cff4385276e882275252/src/bindings/python/flux/cli/base.py#L734
# NOTE: only doing this in alloc; let LAUNCHER cli pass through
# to flux cli (None values are omittied, e.g.
# {o: fastload: None} renders to -o fastload
# Python api doesn't appear to have default value handling?
for key, value in new_arg_values.items():
if value is None:
value = 1
addtl_batch_args[arg_type].update(new_arg_values)
return addtl_batch_args
def get_header(self, step):
"""
Generate the header present at the top of Flux execution scripts.
:param step: A StudyStep instance.
:returns: A string of the header based on internal batch parameters and
the parameter step.
"""
run = dict(step.run)
batch_header = dict(self._batch)
walltime = step.run.get("walltime", None)
batch_header["walltime"] = \
str(self._convert_walltime_to_seconds(walltime))
# Unpack exclusive to modify procs/core per task
step_exclusive = step.run.get("exclusive", None)
step_exclusive = self.resolve_exclusive(self._exclusive, step_exclusive)
nodes = run.get("nodes", None)
if nodes and nodes is not None and nodes != '':
batch_header["nodes"] = run.pop("nodes")
# Abusing 'truthiness' here to catch '' values too
elif not nodes and step_exclusive['allocation']:
LOGGER.error(
"Invalid use of exclusive on allocation: "
"exclusive can only be set with a node count"
)
if run["procs"] and not step_exclusive['allocation']:
batch_header["procs"] = run.pop("procs")
batch_header["job-name"] = step.name.replace(" ", "_")
batch_header["comment"] = step.description.replace("\n", " ")
batch_header["flux_version"] = self._broker_version
# Handle queue -> omit if anonymous queue was detected
if not batch_header["queue"]:
batch_header.pop("queue") # Should we also pop bank here?
modified_header = ["#!{}".format(self._exec)]
# Process INFO lines at the start to avoid interrupting flux directives
for key, value in self._header_info.items():
modified_header.append(value.format(**batch_header))
# Inject a blank # comment line between the info/flux directives for readability
modified_header.append("#")
for key, value in self._header.items():
if key not in batch_header:
continue
print(f"INFO: processing header: {key=}, {value=}, {batch_header=}")
modified_header.append(value.format(**batch_header))
# Add exclusive flag to header if requested
if step_exclusive['allocation']:
modified_header.append(f"{self._flux_directive}" + "--exclusive")
# Process any optional allocation args
for rendered_arg in self._interface.render_additional_args(self._allocation_args):
if rendered_arg:
# Silent pass through for old versions which don't implement any
# interface for batch/allocation args
modified_header.append(f"{self._flux_directive}" + rendered_arg)
return "\n".join(modified_header)
def get_parallelize_command(self, procs, nodes=None, **kwargs):
"""
Generate the FLUX parallelization segement of the command line.
:param procs: Number of processors to allocate to the parallel call.
:param nodes: Number of nodes to allocate to the parallel call
(default = 1).
:returns: A string of the parallelize command configured using nodes
and procs.
"""
# Handle the exclusive flags, updating batch block settings (default)
# with what's set in the step, if any given
step_exclusive = kwargs.pop("exclusive", None)
step_exclusive = self.resolve_exclusive(self._exclusive, step_exclusive)
# TODO: fix this temp hack when standardizing the exclusive key handling
kwargs['exclusive'] = step_exclusive['launcher']
return self._interface.parallelize(
procs, nodes=nodes, addtl_args=self._addl_args,
launcher_args=self._launcher_args, **kwargs)
def submit(self, step, path, cwd, job_map=None, env=None):
"""
Submit a script to the Flux scheduler.
:param step: The StudyStep instance this submission is based on.
:param path: Local path to the script to be executed.
:param cwd: Path to the current working directory.
:param job_map: A dictionary mapping step names to their job
identifiers.
:param env: A dict containing a modified environment for execution.
:returns: The return status of the submission command and job
identiifer.
"""
nodes = step.run.get("nodes", None)
processors = step.run.get("procs", 1)
# Handle the exclusive flags, updating batch block settings (default)
# with what's set in the step, if any given
step_exclusive = step.run.get("exclusive", None)
step_exclusive = self.resolve_exclusive(self._exclusive, step_exclusive)
# TODO: track this down and normalize this upstream for all adapters
if nodes == '':
nodes = None
if nodes is not None and not isinstance(nodes, int):
nodes = int(nodes)
# TODO: revist this after tracking down '' behavior seen with nodes
if not isinstance(processors, int):
if not processors:
processors = 1 # python api requires >= 1 for procs/tasks
else:
processors = int(processors)
force_broker = step.run.get("nested", True)
walltime = \
self._convert_walltime_to_seconds(step.run.get("walltime", 0))
urgency = step.run.get("priority", "medium")
urgency = self.get_priority(urgency)
# Compute cores per task
cores_per_task = step.run.get("cores per task", None)
if isinstance(cores_per_task, str):
try:
cores_per_task = int(cores_per_task)
except:
cores_per_task = 1
if not cores_per_task:
cores_per_task = 1 # flux python api defaults to 1
LOGGER.warn(
"'cores per task' set to a non-value. Populating with a "
"sensible default. (cores per task = %d", cores_per_task)
try:
# Calculate ngpus
ngpus = step.run.get("gpus", "0")
ngpus = int(ngpus) if ngpus else 0
except ValueError as val_error:
# TODO: normalize this upstream for all adapters
msg = f"Specified gpus '{ngpus}' is not a decimal value."
LOGGER.error(msg)
raise val_error
# Calculate nprocs
ncores = cores_per_task * processors # What was this check doing?
# Raise an exception if ncores is 0
if ncores <= 0:
msg = "Invalid number of cores specified. " \
"Aborting. (ncores = {})".format(ncores)
LOGGER.error(msg)
raise ValueError(msg)
# Modify jobspec if exclusive to exclude procs/cores/gpus keys
# TODO: sort out confusing mixing of per task specs here and jobspec
# creation calls using per slot specs...
# TODO: refactor into shared resource modification between this and
# write_script/parallelize
if nodes and step_exclusive['allocation']:
# Should we do this here or inside submit?
processors = nodes # one slot per node
cores_per_task = 1 # cores per slot >= 1 in python api
# filter out ngpus_per_slot inside submit
# Unpack waitable flag and pass it along if there: only pass it along if
# it's in the step maybe, leaving each adapter to retain their defaults?
waitable = step.run.get("waitable", False)
# Normalize the allocation args to api flux.job.JobspecV1 expects
packed_alloc_args = self._interface.pack_addtl_batch_args(self._allocation_args)
# Add queue and bank
queue = self._batch["queue"]
if queue == "":
queue = None
bank = self._batch["bank"]
if bank == "":
bank = None
jobid, retcode, submit_status = \
self._interface.submit(
nodes, processors, cores_per_task, path, cwd, walltime, ngpus,
job_name=step.name, force_broker=force_broker, urgency=urgency,
waitable=waitable, queue=queue, bank=bank,
addtl_batch_args=packed_alloc_args,
exclusive=step_exclusive['allocation'],
)
return SubmissionRecord(submit_status, retcode, jobid)
def check_jobs(self, joblist):
"""
For the given job list, query execution status.
This method uses the scontrol show job <jobid> command and does a
regex search for job information.
:param joblist: A list of job identifiers to be queried.
:returns: The return code of the status query, and a dictionary of job
identifiers to their status.
"""
LOGGER.debug("Joblist type -- %s", type(joblist))
LOGGER.debug("Joblist contents -- %s", joblist)
if not joblist:
LOGGER.debug("Empty job list specified.")
return JobStatusCode.OK, {}
if not isinstance(joblist, list):
LOGGER.debug("Specified parameter is not a list.")
if isinstance(joblist, int):
LOGGER.debug("Integer found.")
joblist = [joblist]
else:
LOGGER.debug("Unknown type. Returning an error.")
return JobStatusCode.ERROR, {}
try:
chk_status, status = self._interface.get_statuses(joblist)
except Exception as excpt:
LOGGER.error(str(excpt))
status = {}
chk_status = JobStatusCode.ERROR
return chk_status, status
def cancel_jobs(self, joblist):
"""
For the given job list, cancel each job.
:param joblist: A list of job identifiers to be cancelled.
:returns: The return code to indicate if jobs were cancelled.
"""
# If we don"t have any jobs to check, just return status OK.
if not joblist:
return CancelCode.OK
c_status, r_code = self._interface.cancel(joblist)
return CancellationRecord(c_status, r_code)
def _state(self, flux_state):
"""
Map a scheduler specific job state to a Study.State enum.
:param flux_state: String representation of scheduler job status.
:returns: A Study.State enum corresponding to parameter job_state.
"""
raise NotImplementedError(
"FluxScriptAdapter no longer uses the _state mapping.")
def _write_script(self, ws_path, step):
"""
Write a Flux script to the workspace of a workflow step.
The job_map optional parameter is a map of workflow step names to job
identifiers. This parameter so far is only planned to be used when a
study is configured to be launched in one go (more or less a script
chain using a scheduler dependency setting). The functionality of
the parameter may change depending on both future intended use.
:param ws_path: Path to the workspace directory of the step.
:param step: An instance of a StudyStep.
:returns: Boolean value (True if to be scheduled), the path to the
written script for run["cmd"], and the path to the script
written for run["restart"] (if it exists).
"""
to_be_scheduled, cmd, restart = self.get_scheduler_command(step)
fname = "{}.{}".format(step.name, self._extension)
script_path = make_safe_path(ws_path, fname)
with open(script_path, "w") as script:
script.write(self.get_header(step))
cmd = "\n\n{}\n".format(cmd)
script.write(cmd)
if restart:
rname = "{}.restart.{}".format(step.name, self._extension)
restart_path = os.path.join(ws_path, rname)
with open(restart_path, "w") as script:
if to_be_scheduled:
script.write(self.get_header(step))
else:
script.write(self._exec)
cmd = "\n\n{}\n".format(restart)
script.write(cmd)
else:
restart_path = None
return to_be_scheduled, script_path, restart_path
def get_priority(self, priority):
return self._interface.get_flux_urgency(priority)