Skip to content

Fluxscriptadapter

Flux Scheduler interface implementation.

FluxScriptAdapter

Bases: SchedulerScriptAdapter

Interface class for the flux scheduler (on Spectrum MPI).

Source code in maestrowf/interfaces/script/fluxscriptadapter.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
class FluxScriptAdapter(SchedulerScriptAdapter):
    """Interface class for the flux scheduler (on Spectrum MPI)."""

    key = "flux"

    def __init__(self, **kwargs):
        """
        Initialize an instance of the FluxScriptAdapter.

        The FluxScriptAdapter is this package interface to the Flux
        scheduler. This adapter constructs Flux scripts for a StudyStep based
        on user set defaults and local settings present in each step.

        The expected keyword arguments that are expected when the Flux adapter
        is instantiated are as follows:
        * host: The cluster to execute scripts on.
        * bank: The account to charge computing time to.
        * queue: Scheduler queue scripts should be submitted to.
        * nodes: The number of compute nodes to be reserved for computing.

        :param **kwargs: A dictionary with default settings for the adapter.
        """
        super(FluxScriptAdapter, self).__init__(**kwargs)

        # Store the interface we're using
        _version = kwargs.pop("version", FluxFactory.latest)
        self.add_batch_parameter("version", _version)
        self._interface = FluxFactory.get_interface(_version)
        # Note, should we also log parsed 'base version' used when comparing
        # the adaptor/broker versions along with the raw string we get back
        # from flux.
        self._broker_version = self._interface.get_flux_version()

        uri = kwargs.pop("uri", None)
        if not uri:             # Check if flux uri env var is set, log if so
            uri = os.environ.get("FLUX_URI", None)
            if uri:
                LOGGER.info(f"Found FLUX_URI in environment, scheduling jobs to broker uri {uri}")
            else:
                LOGGER.info(f"No FLUX_URI; scheduling standalone batch job to root instance")
        else:
            LOGGER.info(f"Using FLUX_URI found in study specification: {uri}")
        # if not uri:
        #     raise ValueError(
        #         "Flux URI must be specified in batch or stored in the "
        #         "environment under 'FLUX_URI'")

        # NOTE: Host doesn"t seem to matter for FLUX. sbatch assumes that the
        # current host is where submission occurs.
        self._allocation_args = kwargs.get("allocation_args", {})
        LOGGER.info(f"Allocation args: {self._allocation_args}")
        self._launcher_args = kwargs.get("launcher_args", {})
        self._addl_args = kwargs.get("args", {})

        # Normalize the allocation and launcher args to a flat dict, removing dotpath encoding
        # TODO: workon a validation api that can be hooked up to yamlspec ingestion machinery for
        # early error messaging
        if self._allocation_args:
            self._allocation_args = self._interface.normalize_additional_args(
                self._allocation_args,
                group_name="allocation",
                filter_unknown=True
            )

        if self._launcher_args:
            self._launcher_args = self._interface.normalize_additional_args(self._launcher_args)

        # Default queue/bank to "" such that 'truthiness' can exclude them
        # from the jobspec/scripts if not provided
        queue = kwargs.pop("queue", "")
        bank = kwargs.pop("bank", "")
        available_queues = self._interface.get_broker_queues()
        # Ignore queue if specified and we detect broker only has anonymous queue
        if not available_queues and queue:
            LOGGER.info(
                "Flux Broker '%s' only has an anonymous queue: "
                "ignoring batch setting '%s'",
                uri,
                queue,
            )
            queue = ""

        self.add_batch_parameter("queue", queue)
        self.add_batch_parameter("bank", bank)

        # Check for the flag in additional args and pop it off, letting step key win later
        # NOTE: only need presence of key as this mimics cli like flag behavior
        # TODO: promote this to formally supported behavior for all adapters, push down into interfaces?
        exclusive_keys = ['x', 'exclusive']
        self._exclusive = {"allocation": False, "launcher": False}
        if all(ekey in self._allocation_args for ekey in exclusive_keys):
            LOGGER.warn("Repeated addition of exclusive flags 'x' and 'exclusive' in allocation_args.")

        alloc_eflags = [self._allocation_args.pop(ekey) for ekey in exclusive_keys if ekey in self._allocation_args]
        if alloc_eflags:
            self._exclusive['allocation'] = True

        if all(ekey in self._launcher_args for ekey in exclusive_keys):
            LOGGER.warn("Repeated addition of exclusive flags 'x' and 'exclusive' in launcher_args.")

        launcher_eflags = [self._launcher_args.pop(ekey) for ekey in exclusive_keys if ekey in self._launcher_args]
        if launcher_eflags:
            self._exclusive['launcher'] = True

        self.add_batch_parameter("exclusive", self._exclusive['allocation'])

        # Populate formally supported flux directives in the header
        self._flux_directive = "#flux: "
        self._header = {
            "nodes": f"{self._flux_directive}" + "-N {nodes}",
            "procs": f"{self._flux_directive}" + "-n {procs}",
            # NOTE: always use seconds to guard against upstream default behavior changes
            "walltime": f"{self._flux_directive}" + "-t {walltime}s",
            "queue": f"{self._flux_directive}" + "-q {queue}",
            "bank": f"{self._flux_directive}" + "--bank={bank}",
            "job-name":
                f"{self._flux_directive}" + "--job-name=\"{job-name}\"\n"
                f"{self._flux_directive}" + "--output={job-name}.{{{{id}}}}.out\n"
                f"{self._flux_directive}" + "--error={job-name}.{{{{id}}}}.err"
        }

        self._cmd_flags = {
            "ntasks": "-n",
            "nodes": "-N",
        }
        self._extension = "flux.sh"
        self.h = None

        # Addition info flags to add to the header: MAESTRO only! flux ignores
        # anything after first non-flux-directive line so this must go last
        self._info_directive = "#MAESTRO-INFO "
        self._header_info = {
            "version": f"{self._info_directive}" + "(flux adapter version) {version}",
            "flux_version": f"{self._info_directive}" + "(flux version) {flux_version}"
        }

        if uri:
            self.add_batch_parameter("flux_uri", uri)
            self._header_info['flux_uri'] = f"{self._info_directive}" + "(flux_uri) {flux_uri}"

    @property
    def extension(self):
        return self._extension

    def _convert_walltime_to_seconds(self, walltime):
        if isinstance(walltime, int) or isinstance(walltime, float):
            LOGGER.debug("Encountered numeric walltime = %s", str(walltime))
            return int(float(walltime) * 60.0)
        elif isinstance(walltime, str) and walltime.isnumeric():
            LOGGER.debug("Encountered numeric walltime = %s", str(walltime))
            return int(float(walltime) * 60.0)
        elif ":" in walltime:
            # Convert walltime to seconds.
            LOGGER.debug("Converting %s to seconds...", walltime)
            seconds = 0.0
            for i, value in enumerate(walltime.split(":")[::-1]):
                seconds += float(value) * (60.0 ** i)
            return seconds
        elif not walltime or (isinstance(walltime, str) and walltime == "inf"):
            return 0
        else:
            msg = \
                f"Walltime value '{walltime}' is not an integer or colon-" \
                f"separated string."
            LOGGER.error(msg)
            raise ValueError(msg)

    def pack_addtl_batch_args(self):
        """
        Normalize the allocation args and pack up into the interface specific
        groups that have assocated jobspec methods, e.g. conf, setattr, setopt.

        :return: dictionary of allocation arg groups to attach to jobspecs
        """
        addtl_batch_args = {
            arg_type: {}
            for arg_type in self._interface.addtl_alloc_arg_types()
        }
        for arg_key, arg_values in self._allocation_args.items():
            # TODO: move this into a validation function for pre launch
            #       batch args validation
            arg_type = self._interface.addtl_alloc_arg_type_map(arg_key)
            if arg_type is None:
                # NO WARNINGS HERE: args in 'misc' type handled elsewhere
                # TODO: add better mechanism for tracking whicn args
                #       actually get used; dicts can'ppt do this..
                continue

            new_arg_values = arg_values
            # Match default of flag types in flux cli.
            # see https://github.com/flux-framework/flux-core/blob/a3860d4dea5b5a17c473cff4385276e882275252/src/bindings/python/flux/cli/base.py#L734
            # NOTE: only doing this in alloc; let LAUNCHER cli pass through
            #       to flux cli (None values are omittied, e.g.
            #       {o: fastload: None} renders to -o fastload
            #       Python api doesn't appear to have default value handling?
            for key, value in new_arg_values.items():
                if value is None:
                    value = 1

            addtl_batch_args[arg_type].update(new_arg_values)

        return addtl_batch_args

    def get_header(self, step):
        """
        Generate the header present at the top of Flux execution scripts.

        :param step: A StudyStep instance.
        :returns: A string of the header based on internal batch parameters and
                  the parameter step.
        """
        run = dict(step.run)
        batch_header = dict(self._batch)
        walltime = step.run.get("walltime", None)
        batch_header["walltime"] = \
            str(self._convert_walltime_to_seconds(walltime))

        # Unpack exclusive to modify procs/core per task
        step_exclusive = step.run.get("exclusive", None)

        step_exclusive = self.resolve_exclusive(self._exclusive, step_exclusive)

        nodes = run.get("nodes", None)
        if nodes and nodes is not None and nodes != '':
            batch_header["nodes"] = run.pop("nodes")

        # Abusing 'truthiness' here to catch '' values too
        elif not nodes and step_exclusive['allocation']:
            LOGGER.error(
                "Invalid use of exclusive on allocation: "
                "exclusive can only be set with a node count"
            )

        if run["procs"] and not step_exclusive['allocation']:
            batch_header["procs"] = run.pop("procs")
        batch_header["job-name"] = step.name.replace(" ", "_")
        batch_header["comment"] = step.description.replace("\n", " ")
        batch_header["flux_version"] = self._broker_version

        # Handle queue -> omit if anonymous queue was detected
        if not batch_header["queue"]:
            batch_header.pop("queue")  # Should we also pop bank here?

        modified_header = ["#!{}".format(self._exec)]

        # Process INFO lines at the start to avoid interrupting flux directives
        for key, value in self._header_info.items():
            modified_header.append(value.format(**batch_header))

        # Inject a blank # comment line between the info/flux directives for readability
        modified_header.append("#")

        for key, value in self._header.items():
            if key not in batch_header:
                continue
            print(f"INFO: processing header: {key=}, {value=}, {batch_header=}")

            modified_header.append(value.format(**batch_header))

        # Add exclusive flag to header if requested
        if step_exclusive['allocation']:
            modified_header.append(f"{self._flux_directive}" + "--exclusive")

        # Process any optional allocation args
        for rendered_arg in self._interface.render_additional_args(self._allocation_args):
            if rendered_arg:
                # Silent pass through for old versions which don't implement any
                # interface for batch/allocation args
                modified_header.append(f"{self._flux_directive}" + rendered_arg)

        return "\n".join(modified_header)

    def get_parallelize_command(self, procs, nodes=None, **kwargs):
        """
        Generate the FLUX parallelization segement of the command line.

        :param procs: Number of processors to allocate to the parallel call.
        :param nodes: Number of nodes to allocate to the parallel call
                      (default = 1).
        :returns: A string of the parallelize command configured using nodes
                  and procs.
        """
        # Handle the exclusive flags, updating batch block settings (default)
        # with what's set in the step, if any given
        step_exclusive = kwargs.pop("exclusive", None)

        step_exclusive = self.resolve_exclusive(self._exclusive, step_exclusive)

        # TODO: fix this temp hack when standardizing the exclusive key handling
        kwargs['exclusive'] = step_exclusive['launcher']

        return self._interface.parallelize(
            procs, nodes=nodes, addtl_args=self._addl_args,
            launcher_args=self._launcher_args, **kwargs)

    def submit(self, step, path, cwd, job_map=None, env=None):
        """
        Submit a script to the Flux scheduler.

        :param step: The StudyStep instance this submission is based on.
        :param path: Local path to the script to be executed.
        :param cwd: Path to the current working directory.
        :param job_map: A dictionary mapping step names to their job
                        identifiers.
        :param env: A dict containing a modified environment for execution.
        :returns: The return status of the submission command and job
                  identiifer.
        """
        nodes = step.run.get("nodes", None)
        processors = step.run.get("procs", 1)

        # Handle the exclusive flags, updating batch block settings (default)
        # with what's set in the step, if any given
        step_exclusive = step.run.get("exclusive", None)

        step_exclusive = self.resolve_exclusive(self._exclusive, step_exclusive)

        # TODO: track this down and normalize this upstream for all adapters
        if nodes == '':
            nodes = None

        if nodes is not None and not isinstance(nodes, int):
            nodes = int(nodes)

        # TODO: revist this after tracking down '' behavior seen with nodes
        if not isinstance(processors, int):
            if not processors:
                processors = 1  # python api requires >= 1 for procs/tasks
            else:
                processors = int(processors)



        force_broker = step.run.get("nested", True)
        walltime = \
            self._convert_walltime_to_seconds(step.run.get("walltime", 0))
        urgency = step.run.get("priority", "medium")
        urgency = self.get_priority(urgency)

        # Compute cores per task
        cores_per_task = step.run.get("cores per task", None)
        if isinstance(cores_per_task, str):
            try:
                cores_per_task = int(cores_per_task)
            except:
                cores_per_task = 1
        if not cores_per_task:
            cores_per_task = 1  # flux python api defaults to 1
            LOGGER.warn(
                "'cores per task' set to a non-value. Populating with a "
                "sensible default. (cores per task = %d", cores_per_task)

        try:
            # Calculate ngpus
            ngpus = step.run.get("gpus", "0")
            ngpus = int(ngpus) if ngpus else 0
        except ValueError as val_error:
            # TODO: normalize this upstream for all adapters
            msg = f"Specified gpus '{ngpus}' is not a decimal value."
            LOGGER.error(msg)
            raise val_error

        # Calculate nprocs
        ncores = cores_per_task * processors  # What was this check doing?
        # Raise an exception if ncores is 0
        if ncores <= 0:
            msg = "Invalid number of cores specified. " \
                  "Aborting. (ncores = {})".format(ncores)
            LOGGER.error(msg)
            raise ValueError(msg)

        # Modify jobspec if exclusive to exclude procs/cores/gpus keys
        # TODO: sort out confusing mixing of per task specs here and jobspec
        #       creation calls using per slot specs...
        # TODO: refactor into shared resource modification between this and
        #       write_script/parallelize
        if nodes and step_exclusive['allocation']:
            # Should we do this here or inside submit?
            processors = nodes  # one slot per node
            cores_per_task = 1  # cores per slot >= 1 in python api
            # filter out ngpus_per_slot inside submit

        # Unpack waitable flag and pass it along if there: only pass it along if
        # it's in the step maybe, leaving each adapter to retain their defaults?
        waitable = step.run.get("waitable", False)

        # Normalize the allocation args to api flux.job.JobspecV1 expects
        packed_alloc_args = self._interface.pack_addtl_batch_args(self._allocation_args)

        # Add queue and bank
        queue = self._batch["queue"]
        if queue == "":
            queue = None
        bank = self._batch["bank"]
        if bank == "":
            bank = None

        jobid, retcode, submit_status = \
            self._interface.submit(
                nodes, processors, cores_per_task, path, cwd, walltime, ngpus,
                job_name=step.name, force_broker=force_broker, urgency=urgency,
                waitable=waitable, queue=queue, bank=bank,
                addtl_batch_args=packed_alloc_args,
                exclusive=step_exclusive['allocation'],
            )

        return SubmissionRecord(submit_status, retcode, jobid)

    def check_jobs(self, joblist):
        """
        For the given job list, query execution status.

        This method uses the scontrol show job <jobid> command and does a
        regex search for job information.

        :param joblist: A list of job identifiers to be queried.
        :returns: The return code of the status query, and a dictionary of job
                  identifiers to their status.
        """
        LOGGER.debug("Joblist type -- %s", type(joblist))
        LOGGER.debug("Joblist contents -- %s", joblist)
        if not joblist:
            LOGGER.debug("Empty job list specified.")
            return JobStatusCode.OK, {}
        if not isinstance(joblist, list):
            LOGGER.debug("Specified parameter is not a list.")
            if isinstance(joblist, int):
                LOGGER.debug("Integer found.")
                joblist = [joblist]
            else:
                LOGGER.debug("Unknown type. Returning an error.")
                return JobStatusCode.ERROR, {}

        try:
            chk_status, status = self._interface.get_statuses(joblist)
        except Exception as excpt:
            LOGGER.error(str(excpt))
            status = {}
            chk_status = JobStatusCode.ERROR

        return chk_status, status

    def cancel_jobs(self, joblist):
        """
        For the given job list, cancel each job.

        :param joblist: A list of job identifiers to be cancelled.
        :returns: The return code to indicate if jobs were cancelled.
        """
        # If we don"t have any jobs to check, just return status OK.
        if not joblist:
            return CancelCode.OK

        c_status, r_code = self._interface.cancel(joblist)
        return CancellationRecord(c_status, r_code)

    def _state(self, flux_state):
        """
        Map a scheduler specific job state to a Study.State enum.

        :param flux_state: String representation of scheduler job status.
        :returns: A Study.State enum corresponding to parameter job_state.
        """
        raise NotImplementedError(
            "FluxScriptAdapter no longer uses the _state mapping.")

    def _write_script(self, ws_path, step):
        """
        Write a Flux script to the workspace of a workflow step.

        The job_map optional parameter is a map of workflow step names to job
        identifiers. This parameter so far is only planned to be used when a
        study is configured to be launched in one go (more or less a script
        chain using a scheduler dependency setting). The functionality of
        the parameter may change depending on both future intended use.

        :param ws_path: Path to the workspace directory of the step.
        :param step: An instance of a StudyStep.
        :returns: Boolean value (True if to be scheduled), the path to the
                  written script for run["cmd"], and the path to the script
                  written for run["restart"] (if it exists).
        """
        to_be_scheduled, cmd, restart = self.get_scheduler_command(step)

        fname = "{}.{}".format(step.name, self._extension)
        script_path = make_safe_path(ws_path, fname)
        with open(script_path, "w") as script:
            script.write(self.get_header(step))
            cmd = "\n\n{}\n".format(cmd)
            script.write(cmd)

        if restart:
            rname = "{}.restart.{}".format(step.name, self._extension)
            restart_path = os.path.join(ws_path, rname)

            with open(restart_path, "w") as script:
                if to_be_scheduled:
                    script.write(self.get_header(step))
                else:
                    script.write(self._exec)

                cmd = "\n\n{}\n".format(restart)
                script.write(cmd)
        else:
            restart_path = None

        return to_be_scheduled, script_path, restart_path

    def get_priority(self, priority):
        return self._interface.get_flux_urgency(priority)

__init__(**kwargs)

Initialize an instance of the FluxScriptAdapter.

The FluxScriptAdapter is this package interface to the Flux scheduler. This adapter constructs Flux scripts for a StudyStep based on user set defaults and local settings present in each step.

The expected keyword arguments that are expected when the Flux adapter is instantiated are as follows: * host: The cluster to execute scripts on. * bank: The account to charge computing time to. * queue: Scheduler queue scripts should be submitted to. * nodes: The number of compute nodes to be reserved for computing.

Parameters:

Name Type Description Default
**kwargs

A dictionary with default settings for the adapter.

{}
Source code in maestrowf/interfaces/script/fluxscriptadapter.py
def __init__(self, **kwargs):
    """
    Initialize an instance of the FluxScriptAdapter.

    The FluxScriptAdapter is this package interface to the Flux
    scheduler. This adapter constructs Flux scripts for a StudyStep based
    on user set defaults and local settings present in each step.

    The expected keyword arguments that are expected when the Flux adapter
    is instantiated are as follows:
    * host: The cluster to execute scripts on.
    * bank: The account to charge computing time to.
    * queue: Scheduler queue scripts should be submitted to.
    * nodes: The number of compute nodes to be reserved for computing.

    :param **kwargs: A dictionary with default settings for the adapter.
    """
    super(FluxScriptAdapter, self).__init__(**kwargs)

    # Store the interface we're using
    _version = kwargs.pop("version", FluxFactory.latest)
    self.add_batch_parameter("version", _version)
    self._interface = FluxFactory.get_interface(_version)
    # Note, should we also log parsed 'base version' used when comparing
    # the adaptor/broker versions along with the raw string we get back
    # from flux.
    self._broker_version = self._interface.get_flux_version()

    uri = kwargs.pop("uri", None)
    if not uri:             # Check if flux uri env var is set, log if so
        uri = os.environ.get("FLUX_URI", None)
        if uri:
            LOGGER.info(f"Found FLUX_URI in environment, scheduling jobs to broker uri {uri}")
        else:
            LOGGER.info(f"No FLUX_URI; scheduling standalone batch job to root instance")
    else:
        LOGGER.info(f"Using FLUX_URI found in study specification: {uri}")
    # if not uri:
    #     raise ValueError(
    #         "Flux URI must be specified in batch or stored in the "
    #         "environment under 'FLUX_URI'")

    # NOTE: Host doesn"t seem to matter for FLUX. sbatch assumes that the
    # current host is where submission occurs.
    self._allocation_args = kwargs.get("allocation_args", {})
    LOGGER.info(f"Allocation args: {self._allocation_args}")
    self._launcher_args = kwargs.get("launcher_args", {})
    self._addl_args = kwargs.get("args", {})

    # Normalize the allocation and launcher args to a flat dict, removing dotpath encoding
    # TODO: workon a validation api that can be hooked up to yamlspec ingestion machinery for
    # early error messaging
    if self._allocation_args:
        self._allocation_args = self._interface.normalize_additional_args(
            self._allocation_args,
            group_name="allocation",
            filter_unknown=True
        )

    if self._launcher_args:
        self._launcher_args = self._interface.normalize_additional_args(self._launcher_args)

    # Default queue/bank to "" such that 'truthiness' can exclude them
    # from the jobspec/scripts if not provided
    queue = kwargs.pop("queue", "")
    bank = kwargs.pop("bank", "")
    available_queues = self._interface.get_broker_queues()
    # Ignore queue if specified and we detect broker only has anonymous queue
    if not available_queues and queue:
        LOGGER.info(
            "Flux Broker '%s' only has an anonymous queue: "
            "ignoring batch setting '%s'",
            uri,
            queue,
        )
        queue = ""

    self.add_batch_parameter("queue", queue)
    self.add_batch_parameter("bank", bank)

    # Check for the flag in additional args and pop it off, letting step key win later
    # NOTE: only need presence of key as this mimics cli like flag behavior
    # TODO: promote this to formally supported behavior for all adapters, push down into interfaces?
    exclusive_keys = ['x', 'exclusive']
    self._exclusive = {"allocation": False, "launcher": False}
    if all(ekey in self._allocation_args for ekey in exclusive_keys):
        LOGGER.warn("Repeated addition of exclusive flags 'x' and 'exclusive' in allocation_args.")

    alloc_eflags = [self._allocation_args.pop(ekey) for ekey in exclusive_keys if ekey in self._allocation_args]
    if alloc_eflags:
        self._exclusive['allocation'] = True

    if all(ekey in self._launcher_args for ekey in exclusive_keys):
        LOGGER.warn("Repeated addition of exclusive flags 'x' and 'exclusive' in launcher_args.")

    launcher_eflags = [self._launcher_args.pop(ekey) for ekey in exclusive_keys if ekey in self._launcher_args]
    if launcher_eflags:
        self._exclusive['launcher'] = True

    self.add_batch_parameter("exclusive", self._exclusive['allocation'])

    # Populate formally supported flux directives in the header
    self._flux_directive = "#flux: "
    self._header = {
        "nodes": f"{self._flux_directive}" + "-N {nodes}",
        "procs": f"{self._flux_directive}" + "-n {procs}",
        # NOTE: always use seconds to guard against upstream default behavior changes
        "walltime": f"{self._flux_directive}" + "-t {walltime}s",
        "queue": f"{self._flux_directive}" + "-q {queue}",
        "bank": f"{self._flux_directive}" + "--bank={bank}",
        "job-name":
            f"{self._flux_directive}" + "--job-name=\"{job-name}\"\n"
            f"{self._flux_directive}" + "--output={job-name}.{{{{id}}}}.out\n"
            f"{self._flux_directive}" + "--error={job-name}.{{{{id}}}}.err"
    }

    self._cmd_flags = {
        "ntasks": "-n",
        "nodes": "-N",
    }
    self._extension = "flux.sh"
    self.h = None

    # Addition info flags to add to the header: MAESTRO only! flux ignores
    # anything after first non-flux-directive line so this must go last
    self._info_directive = "#MAESTRO-INFO "
    self._header_info = {
        "version": f"{self._info_directive}" + "(flux adapter version) {version}",
        "flux_version": f"{self._info_directive}" + "(flux version) {flux_version}"
    }

    if uri:
        self.add_batch_parameter("flux_uri", uri)
        self._header_info['flux_uri'] = f"{self._info_directive}" + "(flux_uri) {flux_uri}"

cancel_jobs(joblist)

For the given job list, cancel each job.

Parameters:

Name Type Description Default
joblist

A list of job identifiers to be cancelled.

required

Returns:

Type Description

The return code to indicate if jobs were cancelled.

Source code in maestrowf/interfaces/script/fluxscriptadapter.py
def cancel_jobs(self, joblist):
    """
    For the given job list, cancel each job.

    :param joblist: A list of job identifiers to be cancelled.
    :returns: The return code to indicate if jobs were cancelled.
    """
    # If we don"t have any jobs to check, just return status OK.
    if not joblist:
        return CancelCode.OK

    c_status, r_code = self._interface.cancel(joblist)
    return CancellationRecord(c_status, r_code)

check_jobs(joblist)

For the given job list, query execution status.

This method uses the scontrol show job command and does a regex search for job information.

Parameters:

Name Type Description Default
joblist

A list of job identifiers to be queried.

required

Returns:

Type Description

The return code of the status query, and a dictionary of job identifiers to their status.

Source code in maestrowf/interfaces/script/fluxscriptadapter.py
def check_jobs(self, joblist):
    """
    For the given job list, query execution status.

    This method uses the scontrol show job <jobid> command and does a
    regex search for job information.

    :param joblist: A list of job identifiers to be queried.
    :returns: The return code of the status query, and a dictionary of job
              identifiers to their status.
    """
    LOGGER.debug("Joblist type -- %s", type(joblist))
    LOGGER.debug("Joblist contents -- %s", joblist)
    if not joblist:
        LOGGER.debug("Empty job list specified.")
        return JobStatusCode.OK, {}
    if not isinstance(joblist, list):
        LOGGER.debug("Specified parameter is not a list.")
        if isinstance(joblist, int):
            LOGGER.debug("Integer found.")
            joblist = [joblist]
        else:
            LOGGER.debug("Unknown type. Returning an error.")
            return JobStatusCode.ERROR, {}

    try:
        chk_status, status = self._interface.get_statuses(joblist)
    except Exception as excpt:
        LOGGER.error(str(excpt))
        status = {}
        chk_status = JobStatusCode.ERROR

    return chk_status, status

get_header(step)

Generate the header present at the top of Flux execution scripts.

Parameters:

Name Type Description Default
step

A StudyStep instance.

required

Returns:

Type Description

A string of the header based on internal batch parameters and the parameter step.

Source code in maestrowf/interfaces/script/fluxscriptadapter.py
def get_header(self, step):
    """
    Generate the header present at the top of Flux execution scripts.

    :param step: A StudyStep instance.
    :returns: A string of the header based on internal batch parameters and
              the parameter step.
    """
    run = dict(step.run)
    batch_header = dict(self._batch)
    walltime = step.run.get("walltime", None)
    batch_header["walltime"] = \
        str(self._convert_walltime_to_seconds(walltime))

    # Unpack exclusive to modify procs/core per task
    step_exclusive = step.run.get("exclusive", None)

    step_exclusive = self.resolve_exclusive(self._exclusive, step_exclusive)

    nodes = run.get("nodes", None)
    if nodes and nodes is not None and nodes != '':
        batch_header["nodes"] = run.pop("nodes")

    # Abusing 'truthiness' here to catch '' values too
    elif not nodes and step_exclusive['allocation']:
        LOGGER.error(
            "Invalid use of exclusive on allocation: "
            "exclusive can only be set with a node count"
        )

    if run["procs"] and not step_exclusive['allocation']:
        batch_header["procs"] = run.pop("procs")
    batch_header["job-name"] = step.name.replace(" ", "_")
    batch_header["comment"] = step.description.replace("\n", " ")
    batch_header["flux_version"] = self._broker_version

    # Handle queue -> omit if anonymous queue was detected
    if not batch_header["queue"]:
        batch_header.pop("queue")  # Should we also pop bank here?

    modified_header = ["#!{}".format(self._exec)]

    # Process INFO lines at the start to avoid interrupting flux directives
    for key, value in self._header_info.items():
        modified_header.append(value.format(**batch_header))

    # Inject a blank # comment line between the info/flux directives for readability
    modified_header.append("#")

    for key, value in self._header.items():
        if key not in batch_header:
            continue
        print(f"INFO: processing header: {key=}, {value=}, {batch_header=}")

        modified_header.append(value.format(**batch_header))

    # Add exclusive flag to header if requested
    if step_exclusive['allocation']:
        modified_header.append(f"{self._flux_directive}" + "--exclusive")

    # Process any optional allocation args
    for rendered_arg in self._interface.render_additional_args(self._allocation_args):
        if rendered_arg:
            # Silent pass through for old versions which don't implement any
            # interface for batch/allocation args
            modified_header.append(f"{self._flux_directive}" + rendered_arg)

    return "\n".join(modified_header)

get_parallelize_command(procs, nodes=None, **kwargs)

Generate the FLUX parallelization segement of the command line.

Parameters:

Name Type Description Default
procs

Number of processors to allocate to the parallel call.

required
nodes

Number of nodes to allocate to the parallel call (default = 1).

None

Returns:

Type Description

A string of the parallelize command configured using nodes and procs.

Source code in maestrowf/interfaces/script/fluxscriptadapter.py
def get_parallelize_command(self, procs, nodes=None, **kwargs):
    """
    Generate the FLUX parallelization segement of the command line.

    :param procs: Number of processors to allocate to the parallel call.
    :param nodes: Number of nodes to allocate to the parallel call
                  (default = 1).
    :returns: A string of the parallelize command configured using nodes
              and procs.
    """
    # Handle the exclusive flags, updating batch block settings (default)
    # with what's set in the step, if any given
    step_exclusive = kwargs.pop("exclusive", None)

    step_exclusive = self.resolve_exclusive(self._exclusive, step_exclusive)

    # TODO: fix this temp hack when standardizing the exclusive key handling
    kwargs['exclusive'] = step_exclusive['launcher']

    return self._interface.parallelize(
        procs, nodes=nodes, addtl_args=self._addl_args,
        launcher_args=self._launcher_args, **kwargs)

pack_addtl_batch_args()

Normalize the allocation args and pack up into the interface specific groups that have assocated jobspec methods, e.g. conf, setattr, setopt.

Returns:

Type Description

dictionary of allocation arg groups to attach to jobspecs

Source code in maestrowf/interfaces/script/fluxscriptadapter.py
def pack_addtl_batch_args(self):
    """
    Normalize the allocation args and pack up into the interface specific
    groups that have assocated jobspec methods, e.g. conf, setattr, setopt.

    :return: dictionary of allocation arg groups to attach to jobspecs
    """
    addtl_batch_args = {
        arg_type: {}
        for arg_type in self._interface.addtl_alloc_arg_types()
    }
    for arg_key, arg_values in self._allocation_args.items():
        # TODO: move this into a validation function for pre launch
        #       batch args validation
        arg_type = self._interface.addtl_alloc_arg_type_map(arg_key)
        if arg_type is None:
            # NO WARNINGS HERE: args in 'misc' type handled elsewhere
            # TODO: add better mechanism for tracking whicn args
            #       actually get used; dicts can'ppt do this..
            continue

        new_arg_values = arg_values
        # Match default of flag types in flux cli.
        # see https://github.com/flux-framework/flux-core/blob/a3860d4dea5b5a17c473cff4385276e882275252/src/bindings/python/flux/cli/base.py#L734
        # NOTE: only doing this in alloc; let LAUNCHER cli pass through
        #       to flux cli (None values are omittied, e.g.
        #       {o: fastload: None} renders to -o fastload
        #       Python api doesn't appear to have default value handling?
        for key, value in new_arg_values.items():
            if value is None:
                value = 1

        addtl_batch_args[arg_type].update(new_arg_values)

    return addtl_batch_args

submit(step, path, cwd, job_map=None, env=None)

Submit a script to the Flux scheduler.

Parameters:

Name Type Description Default
step

The StudyStep instance this submission is based on.

required
path

Local path to the script to be executed.

required
cwd

Path to the current working directory.

required
job_map

A dictionary mapping step names to their job identifiers.

None
env

A dict containing a modified environment for execution.

None

Returns:

Type Description

The return status of the submission command and job identiifer.

Source code in maestrowf/interfaces/script/fluxscriptadapter.py
def submit(self, step, path, cwd, job_map=None, env=None):
    """
    Submit a script to the Flux scheduler.

    :param step: The StudyStep instance this submission is based on.
    :param path: Local path to the script to be executed.
    :param cwd: Path to the current working directory.
    :param job_map: A dictionary mapping step names to their job
                    identifiers.
    :param env: A dict containing a modified environment for execution.
    :returns: The return status of the submission command and job
              identiifer.
    """
    nodes = step.run.get("nodes", None)
    processors = step.run.get("procs", 1)

    # Handle the exclusive flags, updating batch block settings (default)
    # with what's set in the step, if any given
    step_exclusive = step.run.get("exclusive", None)

    step_exclusive = self.resolve_exclusive(self._exclusive, step_exclusive)

    # TODO: track this down and normalize this upstream for all adapters
    if nodes == '':
        nodes = None

    if nodes is not None and not isinstance(nodes, int):
        nodes = int(nodes)

    # TODO: revist this after tracking down '' behavior seen with nodes
    if not isinstance(processors, int):
        if not processors:
            processors = 1  # python api requires >= 1 for procs/tasks
        else:
            processors = int(processors)



    force_broker = step.run.get("nested", True)
    walltime = \
        self._convert_walltime_to_seconds(step.run.get("walltime", 0))
    urgency = step.run.get("priority", "medium")
    urgency = self.get_priority(urgency)

    # Compute cores per task
    cores_per_task = step.run.get("cores per task", None)
    if isinstance(cores_per_task, str):
        try:
            cores_per_task = int(cores_per_task)
        except:
            cores_per_task = 1
    if not cores_per_task:
        cores_per_task = 1  # flux python api defaults to 1
        LOGGER.warn(
            "'cores per task' set to a non-value. Populating with a "
            "sensible default. (cores per task = %d", cores_per_task)

    try:
        # Calculate ngpus
        ngpus = step.run.get("gpus", "0")
        ngpus = int(ngpus) if ngpus else 0
    except ValueError as val_error:
        # TODO: normalize this upstream for all adapters
        msg = f"Specified gpus '{ngpus}' is not a decimal value."
        LOGGER.error(msg)
        raise val_error

    # Calculate nprocs
    ncores = cores_per_task * processors  # What was this check doing?
    # Raise an exception if ncores is 0
    if ncores <= 0:
        msg = "Invalid number of cores specified. " \
              "Aborting. (ncores = {})".format(ncores)
        LOGGER.error(msg)
        raise ValueError(msg)

    # Modify jobspec if exclusive to exclude procs/cores/gpus keys
    # TODO: sort out confusing mixing of per task specs here and jobspec
    #       creation calls using per slot specs...
    # TODO: refactor into shared resource modification between this and
    #       write_script/parallelize
    if nodes and step_exclusive['allocation']:
        # Should we do this here or inside submit?
        processors = nodes  # one slot per node
        cores_per_task = 1  # cores per slot >= 1 in python api
        # filter out ngpus_per_slot inside submit

    # Unpack waitable flag and pass it along if there: only pass it along if
    # it's in the step maybe, leaving each adapter to retain their defaults?
    waitable = step.run.get("waitable", False)

    # Normalize the allocation args to api flux.job.JobspecV1 expects
    packed_alloc_args = self._interface.pack_addtl_batch_args(self._allocation_args)

    # Add queue and bank
    queue = self._batch["queue"]
    if queue == "":
        queue = None
    bank = self._batch["bank"]
    if bank == "":
        bank = None

    jobid, retcode, submit_status = \
        self._interface.submit(
            nodes, processors, cores_per_task, path, cwd, walltime, ngpus,
            job_name=step.name, force_broker=force_broker, urgency=urgency,
            waitable=waitable, queue=queue, bank=bank,
            addtl_batch_args=packed_alloc_args,
            exclusive=step_exclusive['allocation'],
        )

    return SubmissionRecord(submit_status, retcode, jobid)