Skip to content

Executiongraph

Module for the execution of DAG workflows.

ExecutionGraph

Bases: DAG, PickleInterface

Datastructure that tracks, executes, and reports on study execution.

The ExecutionGraph is used to manage, monitor, and interact with tasks and the scheduler. This class searches its graph for tasks that are ready to run, marks tasks as complete, and schedules ready tasks.

The Execution class is where functionality for checking task status, logic for managing and automatically directing and manipulating the workflow should go. Essentially, if logic is needed to automatically manipulate the workflow in some fashion or additional monitoring is needed, this class is where that would go.

Source code in maestrowf/datastructures/core/executiongraph.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
class ExecutionGraph(DAG, PickleInterface):
    """
    Datastructure that tracks, executes, and reports on study execution.

    The ExecutionGraph is used to manage, monitor, and interact with tasks and
    the scheduler. This class searches its graph for tasks that are ready to
    run, marks tasks as complete, and schedules ready tasks.

    The Execution class is where functionality for checking task status, logic
    for managing and automatically directing and manipulating the workflow
    should go. Essentially, if logic is needed to automatically manipulate the
    workflow in some fashion or additional monitoring is needed, this class is
    where that would go.
    """

    def __init__(self, submission_attempts=1, submission_throttle=0,
                 use_tmp=False, dry_run=False):
        """
        Initialize a new instance of an ExecutionGraph.

        :param submission_attempts: Number of attempted submissions before
            marking a step as failed.
        :param submission_throttle: Maximum number of scheduled in progress
        submissions.
        :param use_tmp: A Boolean value that when set to 'True' designates
        that ExecutionGraph should use temporary files for output.
        """
        super(ExecutionGraph, self).__init__()
        # Member variables for execution.
        self._adapter = None
        self._description = OrderedDict()

        # Generate tempdir (if specfied)
        if use_tmp:
            self._tmp_dir = tempfile.mkdtemp()
        else:
            self._tmp_dir = ""

        # Sets to track progress.
        self.completed_steps = set([SOURCE])
        self.in_progress = set()
        self.failed_steps = set()
        self.cancelled_steps = set()
        self.ready_steps = deque()
        self.is_canceled = False

        self._status_order = 'bfs'  # Set status order type
        self._status_subtree = None  # Cache bfs_subtree for status writing

        # Values for management of the DAG. Things like submission attempts,
        # throttling, etc. should be listed here.
        self._submission_attempts = submission_attempts
        self._submission_throttle = submission_throttle
        self.dry_run = dry_run

        # A map that tracks the dependencies of a step.
        # NOTE: I don't know how performant the Python dict structure is, but
        # we'll use it for now. I think this may want to be changed to an AVL
        # tree or something of that nature to guarantee worst case performance.
        self._dependencies = {}

        LOGGER.info(
            "\n------------------------------------------\n"
            "Submission attempts =       %d\n"
            "Submission throttle limit = %d\n"
            "Use temporary directory =   %s\n"
            "Tmp Dir = %s\n"
            "------------------------------------------",
            submission_attempts, submission_throttle, use_tmp, self._tmp_dir
        )

        # Error check that the submission values are valid.
        if self._submission_attempts < 1:
            _msg = "Submission attempts should always be greater than 0. " \
                   "Received a value of {}.".format(self._submission_attempts)
            LOGGER.error(_msg)
            raise ValueError(_msg)

        if self._submission_throttle < 0:
            _msg = "Throttling should be 0 for unthrottled or a positive " \
                   "integer for the number of allowed inflight jobs. " \
                   "Received a value of {}.".format(self._submission_throttle)
            LOGGER.error(_msg)
            raise ValueError(_msg)

    def _check_tmp_dir(self):
        """Check and recreate the tempdir should it have been erased."""
        # If we've specified a tmp dir and the previous tmp dir doesn't exist
        # recreate it.
        if self._tmp_dir and not os.path.exists(self._tmp_dir):
            self._tmp_dir = tempfile.mkdtemp()

    def add_step(self, name, step, workspace, restart_limit, params=None):
        """
        Add a StepRecord to the ExecutionGraph.

        :param name: Name of the step to be added.
        :param step: StudyStep instance to be recorded.
        :param workspace: Directory path for the step's working directory.
        :param restart_limit: Upper limit on the number of restart attempts.
        :param params: Iterable of tuples of step parameter names, values
        """
        data = {
                    "step":          step,
                    "state":         State.INITIALIZED,
                    "workspace":     workspace,
                    "restart_limit": restart_limit,
                }
        record = _StepRecord(**data)
        if params:
            record.add_params(params)

        self._dependencies[name] = set()
        super(ExecutionGraph, self).add_node(name, record)

    def add_connection(self, parent, step):
        """
        Add a connection between two steps in the ExecutionGraph.

        :param parent: The parent step that is required to execute 'step'
        :param step: The dependent step that relies on parent.
        """
        self.add_edge(parent, step)
        self._dependencies[step].add(parent)

    def set_adapter(self, adapter):
        """
        Set the adapter used to interface for scheduling tasks.

        :param adapter: Adapter name to be used when launching the graph.
        """
        if not adapter:
            # If we have no adapter specified, assume sequential execution.
            self._adapter = None
            return

        if not isinstance(adapter, dict):
            msg = "Adapter settings must be contained in a dictionary."
            LOGGER.error(msg)
            raise TypeError(msg)

        # Check to see that the adapter type is something the
        if adapter["type"] not in ScriptAdapterFactory.get_valid_adapters():
            msg = "'{}' adapter must be specfied in ScriptAdapterFactory." \
                  .format(adapter)
            LOGGER.error(msg)
            raise TypeError(msg)

        self._adapter = adapter

    def add_description(self, name, description, **kwargs):
        """
        Add a study description to the ExecutionGraph instance.

        :param name: Name of the study.
        :param description: Description of the study.
        """
        self._description["name"] = name
        self._description["description"] = description
        self._description.update(kwargs)

    @property
    def name(self):
        """
        Return the name for the study in the ExecutionGraph instance.

        :returns: A string of the name of the study.
        """
        return self._description["name"]

    @name.setter
    def name(self, value):
        """
        Set the name for the study in the ExecutionGraph instance.

        :param name: A string of the name for the study.
        """
        self._description["name"] = value

    @property
    def description(self):
        """
        Return the description for the study in the ExecutionGraph instance.

        :returns: A string of the description for the study.
        """
        return self._description["description"]

    @description.setter
    def description(self, value):
        """
        Set the description for the study in the ExecutionGraph instance.

        :param value: A string of the description for the study.
        """
        self._description["description"] = value

    def log_description(self):
        """Log the description of the ExecutionGraph."""
        desc = ["{}: {}".format(key, value)
                for key, value in self._description.items()]
        desc = "\n".join(desc)
        LOGGER.info(
            "\n==================================================\n"
            "%s\n"
            "==================================================\n",
            desc
        )

    def generate_scripts(self):
        """
        Generate the scripts for all steps in the ExecutionGraph.

        The generate_scripts method scans the ExecutionGraph instance and uses
        the stored adapter to write executable scripts for either local or
        scheduled execution. If a restart command is specified, a restart
        script will be generated for that record.
        """
        # An adapter must be specified
        if not self._adapter:
            msg = "Adapter not found. Specify a ScriptAdapter using " \
                  "set_adapter."
            LOGGER.error(msg)
            raise ValueError(msg)

        # Set up the adapter.
        LOGGER.info("Generating scripts...")
        adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
        adapter = adapter(**self._adapter)

        self._check_tmp_dir()
        for key, record in self.values.items():
            if key == SOURCE:
                continue

            # Record generates its own script.
            record.setup_workspace()
            record.generate_script(adapter, self._tmp_dir)

    def _execute_record(self, record, adapter, restart=False):
        """
        Execute a StepRecord.

        :param record: The StepRecord to be executed.
        :param adapter: An instance of the adapter to be used for cluster
        submission.
        :param restart: True if the record needs restarting, False otherwise.
        """
        # Logging for debugging.
        LOGGER.info("Calling execute for StepRecord '%s'", record.name)

        num_restarts = 0    # Times this step has temporally restarted.
        retcode = None      # Execution return code.

        # While our submission needs to be submitted, keep trying:
        # 1. If the JobStatus is not OK.
        # 2. num_restarts is less than self._submission_attempts
        self._check_tmp_dir()

        # Only set up the workspace the initial iteration.
        if not restart:
            LOGGER.debug("Setting up workspace for '%s' at %s",
                         record.name, str(datetime.now()))
            # Generate the script for execution on the fly.
            record.setup_workspace()    # Generate the workspace.
            record.generate_script(adapter, self._tmp_dir)

        if self.dry_run:
            record.mark_end(State.DRYRUN)
            self.completed_steps.add(record.name)
            return

        while retcode != SubmissionCode.OK and \
                num_restarts < self._submission_attempts:
            LOGGER.info("Attempting submission of '%s' (attempt %d of %d)...",
                        record.name, num_restarts + 1,
                        self._submission_attempts)

            # We're not restarting -- submit as usual.
            if not restart:
                LOGGER.debug("Calling 'execute' on '%s' at %s",
                             record.name, str(datetime.now()))
                retcode = record.execute(adapter)
            # Otherwise, it's a restart.
            else:
                # If the restart is specified, use the record restart script.
                LOGGER.debug("Calling 'restart' on '%s' at %s",
                             record.name, str(datetime.now()))
                # Generate the script for execution on the fly.
                record.generate_script(adapter, self._tmp_dir)
                retcode = record.restart(adapter)

            # Increment the number of restarts we've attempted.
            LOGGER.debug("Completed submission attempt %d", num_restarts)
            num_restarts += 1
            sleep((random.random() + 1) * num_restarts)

        if retcode == SubmissionCode.OK:
            self.in_progress.add(record.name)

            if record.is_local_step:
                LOGGER.info("Local step %s executed with status OK. Complete.",
                            record.name)
                record.mark_end(State.FINISHED)
                self.completed_steps.add(record.name)
                self.in_progress.remove(record.name)
        else:
            # Find the subtree, because anything dependent on this step now
            # failed.
            LOGGER.warning("'%s' failed to submit properly. "
                           "Step failed.", record.name)
            path, parent = self.bfs_subtree(record.name)
            for node in path:
                self.failed_steps.add(node)
                self.values[node].mark_end(State.FAILED)

        # After execution state debug logging.
        LOGGER.debug("After execution of '%s' -- New state is %s.",
                     record.name, record.status)

    @property
    def status_subtree(self):
        """Cache the status ordering to improve scaling"""
        if not self._status_subtree:
            if self._status_order == 'bfs':
                subtree, _ = self.bfs_subtree("_source")

            elif self._status_order == 'dfs':
                subtree, _ = self.dfs_subtree("_source", par="_source")

            self._status_subtree = [key for key in subtree
                                    if key != '_source']

        return self._status_subtree

    def write_status(self, path):
        """Write the status of the DAG to a CSV file."""
        header = "Step Name,Job ID,Workspace,State,Run Time,Elapsed Time," \
                 "Start Time,Submit Time,End Time,Number Restarts,Params"
        status = [header]

        for key in self.status_subtree:
            value = self.values[key]

            jobid_str = "--"
            if value.jobid:
                jobid_str = str(value.jobid[-1])

            # Include step root in workspace when parameterized
            if list(value.params.items()):
                ws = os.path.join(
                    * os.path.normpath(
                        value.workspace.value).split(os.sep)[-2:]
                )
            else:
                ws = os.path.split(value.workspace.value)[1]

            _ = [
                    value.name, jobid_str,
                    ws,
                    str(value.status.name), value.run_time, value.elapsed_time,
                    value.time_start, value.time_submitted, value.time_end,
                    str(value.restarts),
                    ";".join(["{}:{}".format(param, value)
                              for param, value in value.params.items()])
                ]
            _ = ",".join(_)
            status.append(_)

        stat_path = os.path.join(path, "status.csv")
        lock_path = os.path.join(path, ".status.lock")
        lock = FileLock(lock_path)
        try:
            with lock.acquire(timeout=10):
                with open(stat_path, "w+") as stat_file:
                    stat_file.write("\n".join(status))
        except Timeout:
            pass

    def _check_study_completion(self):
        # We cancelled, return True marking study as complete.
        if self.is_canceled:
            LOGGER.info("Cancelled -- completing study.")
            return StudyStatus.CANCELLED

        # check for completion of all steps
        resolved_set = \
            self.completed_steps | self.failed_steps | self.cancelled_steps
        if not set(self.values.keys()) - resolved_set:
            # some steps were cancelled and is_canceled wasn't set
            if len(self.cancelled_steps) > 0:
                logging.info("'%s' was cancelled. Returning.", self.name)
                return StudyStatus.CANCELLED

            # some steps were failures indicating failure
            if len(self.failed_steps) > 0:
                logging.info("'%s' is complete with failures. Returning.",
                             self.name)
                return StudyStatus.FAILURE

            # everything completed were are done
            logging.info("'%s' is complete. Returning.", self.name)
            return StudyStatus.FINISHED

        return StudyStatus.RUNNING

    def execute_ready_steps(self):
        """
        Execute any steps whose dependencies are satisfied.

        The 'execute_ready_steps' method is the core of how the ExecutionGraph
        manages execution. This method does the following:

        * Checks the status of existing jobs that are executing and updates
          the state if changed.
        * Finds steps that are initialized and determines what can be run
          based on satisfied dependencies and executes steps whose
          dependencies are met.

        :returns: True if the study has completed, False otherwise.
        """
        # TODO: We may want to move this to a singleton somewhere
        # so we can guarantee that all steps use the same adapter.
        adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
        adapter = adapter(**self._adapter)

        if not self.dry_run:
            LOGGER.debug("Checking status check...")
            retcode, job_status = self.check_study_status()
        else:
            LOGGER.debug("DRYRUN: Skipping status check...")
            retcode = JobStatusCode.OK
            job_status = {}

        LOGGER.debug("Checked status (retcode %s)-- %s", retcode, job_status)

        # For now, if we can't check the status something is wrong.
        # Don't modify the DAG.
        if retcode == JobStatusCode.ERROR:
            msg = "Job status check failed -- Aborting."
            LOGGER.error(msg)
            raise RuntimeError(msg)
        elif retcode == JobStatusCode.OK:
            # For the status of each currently in progress job, check its
            # state.
            cleanup_steps = set()  # Steps that are in progress showing failed.

            for name, status in job_status.items():
                LOGGER.debug("Checking job '%s' with status %s.", name, status)
                record = self.values[name]

                if status == State.FINISHED:
                    # Mark the step complete and notate its end time.
                    record.mark_end(State.FINISHED)
                    LOGGER.info("Step '%s' marked as finished. Adding to "
                                "complete set.", name)
                    self.completed_steps.add(name)
                    self.in_progress.remove(name)

                elif status == State.RUNNING:
                    # When detect that a step is running, mark it.
                    LOGGER.info("Step '%s' found to be running.", record.name)
                    record.mark_running()

                elif status == State.TIMEDOUT:
                    # Execute the restart script.
                    # If a restart script doesn't exist, re-run the command.
                    # If we're under the restart limit, attempt a restart.
                    if record.can_restart:
                        if record.mark_restart():
                            LOGGER.info(
                                "Step '%s' timed out. Restarting (%s of %s).",
                                name, record.restarts, record.restart_limit
                            )
                            self._execute_record(record, adapter, restart=True)
                        else:
                            LOGGER.info("'%s' has been restarted %s of %s "
                                        "times. Marking step and all "
                                        "descendents as failed.",
                                        name,
                                        record.restarts,
                                        record.restart_limit)
                            self.in_progress.remove(name)
                            cleanup_steps.update(self.bfs_subtree(name)[0])
                    # Otherwise, we can't restart so mark the step timed out.
                    else:
                        LOGGER.info("'%s' timed out, but cannot be restarted."
                                    " Marked as TIMEDOUT.", name)
                        # Mark that the step ended due to TIMEOUT.
                        record.mark_end(State.TIMEDOUT)
                        # Remove from in progress since it no longer is.
                        self.in_progress.remove(name)
                        # Add the subtree to the clean up steps
                        cleanup_steps.update(self.bfs_subtree(name)[0])
                        # Remove the current step, clean up is used to mark
                        # steps definitively as failed.
                        cleanup_steps.remove(name)
                        # Add the current step to failed.
                        self.failed_steps.add(name)

                elif status == State.HWFAILURE:
                    # TODO: Need to make sure that we do this a finite number
                    # of times.
                    # Resubmit the cmd.
                    LOGGER.warning("Hardware failure detected. Attempting to "
                                   "resubmit step '%s'.", name)
                    # We can just let the logic below handle submission with
                    # everything else.
                    self.ready_steps.append(name)

                elif status == State.FAILED:
                    LOGGER.warning(
                        "Job failure reported. Aborting %s -- flagging all "
                        "dependent jobs as failed.",
                        name
                    )
                    self.in_progress.remove(name)
                    record.mark_end(State.FAILED)
                    cleanup_steps.update(self.bfs_subtree(name)[0])

                elif status == State.UNKNOWN:
                    record.mark_end(State.UNKNOWN)
                    LOGGER.info(
                        "Step '%s' found in UNKNOWN state. Step was found "
                        "in '%s' state previously, marking as UNKNOWN. "
                        "Adding to failed steps.",
                        name, record.status)
                    cleanup_steps.update(self.bfs_subtree(name)[0])
                    self.in_progress.remove(name)

                elif status == State.CANCELLED:
                    LOGGER.info("Step '%s' was cancelled.", name)
                    self.in_progress.remove(name)
                    record.mark_end(State.CANCELLED)

            # Let's handle all the failed steps in one go.
            for node in cleanup_steps:
                self.failed_steps.add(node)
                self.values[node].mark_end(State.FAILED)

        # Now that we've checked the statuses of existing jobs we need to make
        # sure dependencies haven't been met.
        for key in self.values.keys():
            # We MUST dereference from the key. If we use values.items(), a
            # generator gets produced which will give us a COPY of a record and
            # not the actual record.
            record = self.values[key]

            # A completed step by definition has had its dependencies met.
            # Skip it.
            if key in self.completed_steps:
                LOGGER.debug("'%s' in completed set, skipping.", key)
                continue

            LOGGER.debug("Checking %s -- %s", key, record.jobid)
            # If the record is only INITIALIZED, we have encountered a step
            # that needs consideration.
            if record.status == State.INITIALIZED:
                LOGGER.debug("'%s' found to be initialized. Checking "
                             "dependencies. ", key)

                LOGGER.debug(
                    "Unfulfilled dependencies: %s",
                    self._dependencies[key])

                s_completed = filter(
                    lambda x: x in self.completed_steps,
                    self._dependencies[key])
                self._dependencies[key] = \
                    self._dependencies[key] - set(s_completed)
                LOGGER.debug(
                    "Completed dependencies: %s\n"
                    "Remaining dependencies: %s",
                    s_completed, self._dependencies[key])

                # If the gating dependencies set is empty, we can execute.
                if not self._dependencies[key]:
                    if key not in self.ready_steps:
                        LOGGER.debug("All dependencies completed. Staging.")
                        self.ready_steps.append(key)
                    else:
                        LOGGER.debug("Already staged. Passing.")
                        continue

        # We now have a collection of ready steps. Execute.
        # If we don't have a submission limit, go ahead and submit all.
        if self._submission_throttle == 0:
            LOGGER.info("Launching all ready steps...")
            _available = len(self.ready_steps)
        # Else, we have a limit -- adhere to it.
        else:
            # Compute the number of available slots we have for execution.
            _available = self._submission_throttle - len(self.in_progress)
            # Available slots should never be negative, but on the off chance
            # we are in a slot deficit, then we will just say none are free.
            _available = max(0, _available)
            # Now, we need to take the min of the length of the queue and the
            # computed number of slots. We could have free slots, but have less
            # in the queue.
            _available = min(_available, len(self.ready_steps))
            LOGGER.info("Found %d available slots...", _available)

        for i in range(0, _available):
            # Pop the record and execute using the helper method.
            _record = self.values[self.ready_steps.popleft()]

            # If we get to this point and we've cancelled, cancel the record.
            if self.is_canceled:
                LOGGER.info("Cancelling '%s' -- continuing.", _record.name)
                _record.mark_end(State.CANCELLED)
                self.cancelled_steps.add(_record.name)
                continue

            LOGGER.debug("Launching job %d -- %s", i, _record.name)
            self._execute_record(_record, adapter)

        # check the status of the study upon finishing this round of execution
        completion_status = self._check_study_completion()
        return completion_status

    def check_study_status(self):
        """
        Check the status of currently executing steps in the graph.

        This method is used to check the status of all currently in progress
        steps in the ExecutionGraph. Each ExecutionGraph stores the adapter
        used to generate and execute its scripts.
        """
        # Set up the job list and the map to get back to step names.
        joblist = []
        jobmap = {}
        for step in self.in_progress:
            jobid = self.values[step].jobid[-1]
            joblist.append(jobid)
            jobmap[jobid] = step

        # Grab the adapter from the ScriptAdapterFactory.
        adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
        adapter = adapter(**self._adapter)
        # Use the adapter to grab the job statuses.
        retcode, job_status = adapter.check_jobs(joblist)
        # Map the job identifiers back to step names.
        step_status = {jobmap[jobid]: status
                       for jobid, status in job_status.items()}

        # Based on return code, log something different.
        if retcode == JobStatusCode.OK:
            LOGGER.info("Jobs found for user '%s'.", getpass.getuser())
            return retcode, step_status
        elif retcode == JobStatusCode.NOJOBS:
            LOGGER.info("No jobs found.")
            return retcode, step_status
        else:
            msg = "Unknown Error (Code = {})".format(retcode)
            LOGGER.error(msg)
            return retcode, step_status

    def cancel_study(self):
        """Cancel the study."""
        joblist = []
        for step in self.in_progress:
            jobid = self.values[step].jobid[-1]
            joblist.append(jobid)

        # Grab the adapter from the ScriptAdapterFactory.
        adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
        adapter = adapter(**self._adapter)

        # cancel our jobs
        crecord = adapter.cancel_jobs(joblist)
        self.is_canceled = True

        if crecord.cancel_status == CancelCode.OK:
            LOGGER.info("Successfully requested to cancel all jobs.")
        elif crecord.cancel_status == CancelCode.ERROR:
            LOGGER.error(
                "Failed to cancel jobs. (Code = %s)", crecord.return_code)
        else:
            LOGGER.error("Unknown Error (Code = %s)", crecord.return_code)

        return crecord.cancel_status

    def cleanup(self):
        """Clean up output produced by the ExecutionGraph."""
        if self._tmp_dir:
            shutil.rmtree(self._tmp_dir, ignore_errors=True)

description writable property

Return the description for the study in the ExecutionGraph instance.

Returns:

Type Description

A string of the description for the study.

name writable property

Return the name for the study in the ExecutionGraph instance.

Returns:

Type Description

A string of the name of the study.

status_subtree property

Cache the status ordering to improve scaling

__init__(submission_attempts=1, submission_throttle=0, use_tmp=False, dry_run=False)

Initialize a new instance of an ExecutionGraph.

Parameters:

Name Type Description Default
submission_attempts

Number of attempted submissions before marking a step as failed.

1
submission_throttle

Maximum number of scheduled in progress submissions.

0
use_tmp

A Boolean value that when set to 'True' designates that ExecutionGraph should use temporary files for output.

False
Source code in maestrowf/datastructures/core/executiongraph.py
def __init__(self, submission_attempts=1, submission_throttle=0,
             use_tmp=False, dry_run=False):
    """
    Initialize a new instance of an ExecutionGraph.

    :param submission_attempts: Number of attempted submissions before
        marking a step as failed.
    :param submission_throttle: Maximum number of scheduled in progress
    submissions.
    :param use_tmp: A Boolean value that when set to 'True' designates
    that ExecutionGraph should use temporary files for output.
    """
    super(ExecutionGraph, self).__init__()
    # Member variables for execution.
    self._adapter = None
    self._description = OrderedDict()

    # Generate tempdir (if specfied)
    if use_tmp:
        self._tmp_dir = tempfile.mkdtemp()
    else:
        self._tmp_dir = ""

    # Sets to track progress.
    self.completed_steps = set([SOURCE])
    self.in_progress = set()
    self.failed_steps = set()
    self.cancelled_steps = set()
    self.ready_steps = deque()
    self.is_canceled = False

    self._status_order = 'bfs'  # Set status order type
    self._status_subtree = None  # Cache bfs_subtree for status writing

    # Values for management of the DAG. Things like submission attempts,
    # throttling, etc. should be listed here.
    self._submission_attempts = submission_attempts
    self._submission_throttle = submission_throttle
    self.dry_run = dry_run

    # A map that tracks the dependencies of a step.
    # NOTE: I don't know how performant the Python dict structure is, but
    # we'll use it for now. I think this may want to be changed to an AVL
    # tree or something of that nature to guarantee worst case performance.
    self._dependencies = {}

    LOGGER.info(
        "\n------------------------------------------\n"
        "Submission attempts =       %d\n"
        "Submission throttle limit = %d\n"
        "Use temporary directory =   %s\n"
        "Tmp Dir = %s\n"
        "------------------------------------------",
        submission_attempts, submission_throttle, use_tmp, self._tmp_dir
    )

    # Error check that the submission values are valid.
    if self._submission_attempts < 1:
        _msg = "Submission attempts should always be greater than 0. " \
               "Received a value of {}.".format(self._submission_attempts)
        LOGGER.error(_msg)
        raise ValueError(_msg)

    if self._submission_throttle < 0:
        _msg = "Throttling should be 0 for unthrottled or a positive " \
               "integer for the number of allowed inflight jobs. " \
               "Received a value of {}.".format(self._submission_throttle)
        LOGGER.error(_msg)
        raise ValueError(_msg)

add_connection(parent, step)

Add a connection between two steps in the ExecutionGraph.

Parameters:

Name Type Description Default
parent

The parent step that is required to execute 'step'

required
step

The dependent step that relies on parent.

required
Source code in maestrowf/datastructures/core/executiongraph.py
def add_connection(self, parent, step):
    """
    Add a connection between two steps in the ExecutionGraph.

    :param parent: The parent step that is required to execute 'step'
    :param step: The dependent step that relies on parent.
    """
    self.add_edge(parent, step)
    self._dependencies[step].add(parent)

add_description(name, description, **kwargs)

Add a study description to the ExecutionGraph instance.

Parameters:

Name Type Description Default
name

Name of the study.

required
description

Description of the study.

required
Source code in maestrowf/datastructures/core/executiongraph.py
def add_description(self, name, description, **kwargs):
    """
    Add a study description to the ExecutionGraph instance.

    :param name: Name of the study.
    :param description: Description of the study.
    """
    self._description["name"] = name
    self._description["description"] = description
    self._description.update(kwargs)

add_step(name, step, workspace, restart_limit, params=None)

Add a StepRecord to the ExecutionGraph.

Parameters:

Name Type Description Default
name

Name of the step to be added.

required
step

StudyStep instance to be recorded.

required
workspace

Directory path for the step's working directory.

required
restart_limit

Upper limit on the number of restart attempts.

required
params

Iterable of tuples of step parameter names, values

None
Source code in maestrowf/datastructures/core/executiongraph.py
def add_step(self, name, step, workspace, restart_limit, params=None):
    """
    Add a StepRecord to the ExecutionGraph.

    :param name: Name of the step to be added.
    :param step: StudyStep instance to be recorded.
    :param workspace: Directory path for the step's working directory.
    :param restart_limit: Upper limit on the number of restart attempts.
    :param params: Iterable of tuples of step parameter names, values
    """
    data = {
                "step":          step,
                "state":         State.INITIALIZED,
                "workspace":     workspace,
                "restart_limit": restart_limit,
            }
    record = _StepRecord(**data)
    if params:
        record.add_params(params)

    self._dependencies[name] = set()
    super(ExecutionGraph, self).add_node(name, record)

cancel_study()

Cancel the study.

Source code in maestrowf/datastructures/core/executiongraph.py
def cancel_study(self):
    """Cancel the study."""
    joblist = []
    for step in self.in_progress:
        jobid = self.values[step].jobid[-1]
        joblist.append(jobid)

    # Grab the adapter from the ScriptAdapterFactory.
    adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
    adapter = adapter(**self._adapter)

    # cancel our jobs
    crecord = adapter.cancel_jobs(joblist)
    self.is_canceled = True

    if crecord.cancel_status == CancelCode.OK:
        LOGGER.info("Successfully requested to cancel all jobs.")
    elif crecord.cancel_status == CancelCode.ERROR:
        LOGGER.error(
            "Failed to cancel jobs. (Code = %s)", crecord.return_code)
    else:
        LOGGER.error("Unknown Error (Code = %s)", crecord.return_code)

    return crecord.cancel_status

check_study_status()

Check the status of currently executing steps in the graph.

This method is used to check the status of all currently in progress steps in the ExecutionGraph. Each ExecutionGraph stores the adapter used to generate and execute its scripts.

Source code in maestrowf/datastructures/core/executiongraph.py
def check_study_status(self):
    """
    Check the status of currently executing steps in the graph.

    This method is used to check the status of all currently in progress
    steps in the ExecutionGraph. Each ExecutionGraph stores the adapter
    used to generate and execute its scripts.
    """
    # Set up the job list and the map to get back to step names.
    joblist = []
    jobmap = {}
    for step in self.in_progress:
        jobid = self.values[step].jobid[-1]
        joblist.append(jobid)
        jobmap[jobid] = step

    # Grab the adapter from the ScriptAdapterFactory.
    adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
    adapter = adapter(**self._adapter)
    # Use the adapter to grab the job statuses.
    retcode, job_status = adapter.check_jobs(joblist)
    # Map the job identifiers back to step names.
    step_status = {jobmap[jobid]: status
                   for jobid, status in job_status.items()}

    # Based on return code, log something different.
    if retcode == JobStatusCode.OK:
        LOGGER.info("Jobs found for user '%s'.", getpass.getuser())
        return retcode, step_status
    elif retcode == JobStatusCode.NOJOBS:
        LOGGER.info("No jobs found.")
        return retcode, step_status
    else:
        msg = "Unknown Error (Code = {})".format(retcode)
        LOGGER.error(msg)
        return retcode, step_status

cleanup()

Clean up output produced by the ExecutionGraph.

Source code in maestrowf/datastructures/core/executiongraph.py
def cleanup(self):
    """Clean up output produced by the ExecutionGraph."""
    if self._tmp_dir:
        shutil.rmtree(self._tmp_dir, ignore_errors=True)

execute_ready_steps()

Execute any steps whose dependencies are satisfied.

The 'execute_ready_steps' method is the core of how the ExecutionGraph manages execution. This method does the following:

  • Checks the status of existing jobs that are executing and updates the state if changed.
  • Finds steps that are initialized and determines what can be run based on satisfied dependencies and executes steps whose dependencies are met.

Returns:

Type Description

True if the study has completed, False otherwise.

Source code in maestrowf/datastructures/core/executiongraph.py
def execute_ready_steps(self):
    """
    Execute any steps whose dependencies are satisfied.

    The 'execute_ready_steps' method is the core of how the ExecutionGraph
    manages execution. This method does the following:

    * Checks the status of existing jobs that are executing and updates
      the state if changed.
    * Finds steps that are initialized and determines what can be run
      based on satisfied dependencies and executes steps whose
      dependencies are met.

    :returns: True if the study has completed, False otherwise.
    """
    # TODO: We may want to move this to a singleton somewhere
    # so we can guarantee that all steps use the same adapter.
    adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
    adapter = adapter(**self._adapter)

    if not self.dry_run:
        LOGGER.debug("Checking status check...")
        retcode, job_status = self.check_study_status()
    else:
        LOGGER.debug("DRYRUN: Skipping status check...")
        retcode = JobStatusCode.OK
        job_status = {}

    LOGGER.debug("Checked status (retcode %s)-- %s", retcode, job_status)

    # For now, if we can't check the status something is wrong.
    # Don't modify the DAG.
    if retcode == JobStatusCode.ERROR:
        msg = "Job status check failed -- Aborting."
        LOGGER.error(msg)
        raise RuntimeError(msg)
    elif retcode == JobStatusCode.OK:
        # For the status of each currently in progress job, check its
        # state.
        cleanup_steps = set()  # Steps that are in progress showing failed.

        for name, status in job_status.items():
            LOGGER.debug("Checking job '%s' with status %s.", name, status)
            record = self.values[name]

            if status == State.FINISHED:
                # Mark the step complete and notate its end time.
                record.mark_end(State.FINISHED)
                LOGGER.info("Step '%s' marked as finished. Adding to "
                            "complete set.", name)
                self.completed_steps.add(name)
                self.in_progress.remove(name)

            elif status == State.RUNNING:
                # When detect that a step is running, mark it.
                LOGGER.info("Step '%s' found to be running.", record.name)
                record.mark_running()

            elif status == State.TIMEDOUT:
                # Execute the restart script.
                # If a restart script doesn't exist, re-run the command.
                # If we're under the restart limit, attempt a restart.
                if record.can_restart:
                    if record.mark_restart():
                        LOGGER.info(
                            "Step '%s' timed out. Restarting (%s of %s).",
                            name, record.restarts, record.restart_limit
                        )
                        self._execute_record(record, adapter, restart=True)
                    else:
                        LOGGER.info("'%s' has been restarted %s of %s "
                                    "times. Marking step and all "
                                    "descendents as failed.",
                                    name,
                                    record.restarts,
                                    record.restart_limit)
                        self.in_progress.remove(name)
                        cleanup_steps.update(self.bfs_subtree(name)[0])
                # Otherwise, we can't restart so mark the step timed out.
                else:
                    LOGGER.info("'%s' timed out, but cannot be restarted."
                                " Marked as TIMEDOUT.", name)
                    # Mark that the step ended due to TIMEOUT.
                    record.mark_end(State.TIMEDOUT)
                    # Remove from in progress since it no longer is.
                    self.in_progress.remove(name)
                    # Add the subtree to the clean up steps
                    cleanup_steps.update(self.bfs_subtree(name)[0])
                    # Remove the current step, clean up is used to mark
                    # steps definitively as failed.
                    cleanup_steps.remove(name)
                    # Add the current step to failed.
                    self.failed_steps.add(name)

            elif status == State.HWFAILURE:
                # TODO: Need to make sure that we do this a finite number
                # of times.
                # Resubmit the cmd.
                LOGGER.warning("Hardware failure detected. Attempting to "
                               "resubmit step '%s'.", name)
                # We can just let the logic below handle submission with
                # everything else.
                self.ready_steps.append(name)

            elif status == State.FAILED:
                LOGGER.warning(
                    "Job failure reported. Aborting %s -- flagging all "
                    "dependent jobs as failed.",
                    name
                )
                self.in_progress.remove(name)
                record.mark_end(State.FAILED)
                cleanup_steps.update(self.bfs_subtree(name)[0])

            elif status == State.UNKNOWN:
                record.mark_end(State.UNKNOWN)
                LOGGER.info(
                    "Step '%s' found in UNKNOWN state. Step was found "
                    "in '%s' state previously, marking as UNKNOWN. "
                    "Adding to failed steps.",
                    name, record.status)
                cleanup_steps.update(self.bfs_subtree(name)[0])
                self.in_progress.remove(name)

            elif status == State.CANCELLED:
                LOGGER.info("Step '%s' was cancelled.", name)
                self.in_progress.remove(name)
                record.mark_end(State.CANCELLED)

        # Let's handle all the failed steps in one go.
        for node in cleanup_steps:
            self.failed_steps.add(node)
            self.values[node].mark_end(State.FAILED)

    # Now that we've checked the statuses of existing jobs we need to make
    # sure dependencies haven't been met.
    for key in self.values.keys():
        # We MUST dereference from the key. If we use values.items(), a
        # generator gets produced which will give us a COPY of a record and
        # not the actual record.
        record = self.values[key]

        # A completed step by definition has had its dependencies met.
        # Skip it.
        if key in self.completed_steps:
            LOGGER.debug("'%s' in completed set, skipping.", key)
            continue

        LOGGER.debug("Checking %s -- %s", key, record.jobid)
        # If the record is only INITIALIZED, we have encountered a step
        # that needs consideration.
        if record.status == State.INITIALIZED:
            LOGGER.debug("'%s' found to be initialized. Checking "
                         "dependencies. ", key)

            LOGGER.debug(
                "Unfulfilled dependencies: %s",
                self._dependencies[key])

            s_completed = filter(
                lambda x: x in self.completed_steps,
                self._dependencies[key])
            self._dependencies[key] = \
                self._dependencies[key] - set(s_completed)
            LOGGER.debug(
                "Completed dependencies: %s\n"
                "Remaining dependencies: %s",
                s_completed, self._dependencies[key])

            # If the gating dependencies set is empty, we can execute.
            if not self._dependencies[key]:
                if key not in self.ready_steps:
                    LOGGER.debug("All dependencies completed. Staging.")
                    self.ready_steps.append(key)
                else:
                    LOGGER.debug("Already staged. Passing.")
                    continue

    # We now have a collection of ready steps. Execute.
    # If we don't have a submission limit, go ahead and submit all.
    if self._submission_throttle == 0:
        LOGGER.info("Launching all ready steps...")
        _available = len(self.ready_steps)
    # Else, we have a limit -- adhere to it.
    else:
        # Compute the number of available slots we have for execution.
        _available = self._submission_throttle - len(self.in_progress)
        # Available slots should never be negative, but on the off chance
        # we are in a slot deficit, then we will just say none are free.
        _available = max(0, _available)
        # Now, we need to take the min of the length of the queue and the
        # computed number of slots. We could have free slots, but have less
        # in the queue.
        _available = min(_available, len(self.ready_steps))
        LOGGER.info("Found %d available slots...", _available)

    for i in range(0, _available):
        # Pop the record and execute using the helper method.
        _record = self.values[self.ready_steps.popleft()]

        # If we get to this point and we've cancelled, cancel the record.
        if self.is_canceled:
            LOGGER.info("Cancelling '%s' -- continuing.", _record.name)
            _record.mark_end(State.CANCELLED)
            self.cancelled_steps.add(_record.name)
            continue

        LOGGER.debug("Launching job %d -- %s", i, _record.name)
        self._execute_record(_record, adapter)

    # check the status of the study upon finishing this round of execution
    completion_status = self._check_study_completion()
    return completion_status

generate_scripts()

Generate the scripts for all steps in the ExecutionGraph.

The generate_scripts method scans the ExecutionGraph instance and uses the stored adapter to write executable scripts for either local or scheduled execution. If a restart command is specified, a restart script will be generated for that record.

Source code in maestrowf/datastructures/core/executiongraph.py
def generate_scripts(self):
    """
    Generate the scripts for all steps in the ExecutionGraph.

    The generate_scripts method scans the ExecutionGraph instance and uses
    the stored adapter to write executable scripts for either local or
    scheduled execution. If a restart command is specified, a restart
    script will be generated for that record.
    """
    # An adapter must be specified
    if not self._adapter:
        msg = "Adapter not found. Specify a ScriptAdapter using " \
              "set_adapter."
        LOGGER.error(msg)
        raise ValueError(msg)

    # Set up the adapter.
    LOGGER.info("Generating scripts...")
    adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
    adapter = adapter(**self._adapter)

    self._check_tmp_dir()
    for key, record in self.values.items():
        if key == SOURCE:
            continue

        # Record generates its own script.
        record.setup_workspace()
        record.generate_script(adapter, self._tmp_dir)

log_description()

Log the description of the ExecutionGraph.

Source code in maestrowf/datastructures/core/executiongraph.py
def log_description(self):
    """Log the description of the ExecutionGraph."""
    desc = ["{}: {}".format(key, value)
            for key, value in self._description.items()]
    desc = "\n".join(desc)
    LOGGER.info(
        "\n==================================================\n"
        "%s\n"
        "==================================================\n",
        desc
    )

set_adapter(adapter)

Set the adapter used to interface for scheduling tasks.

Parameters:

Name Type Description Default
adapter

Adapter name to be used when launching the graph.

required
Source code in maestrowf/datastructures/core/executiongraph.py
def set_adapter(self, adapter):
    """
    Set the adapter used to interface for scheduling tasks.

    :param adapter: Adapter name to be used when launching the graph.
    """
    if not adapter:
        # If we have no adapter specified, assume sequential execution.
        self._adapter = None
        return

    if not isinstance(adapter, dict):
        msg = "Adapter settings must be contained in a dictionary."
        LOGGER.error(msg)
        raise TypeError(msg)

    # Check to see that the adapter type is something the
    if adapter["type"] not in ScriptAdapterFactory.get_valid_adapters():
        msg = "'{}' adapter must be specfied in ScriptAdapterFactory." \
              .format(adapter)
        LOGGER.error(msg)
        raise TypeError(msg)

    self._adapter = adapter

write_status(path)

Write the status of the DAG to a CSV file.

Source code in maestrowf/datastructures/core/executiongraph.py
def write_status(self, path):
    """Write the status of the DAG to a CSV file."""
    header = "Step Name,Job ID,Workspace,State,Run Time,Elapsed Time," \
             "Start Time,Submit Time,End Time,Number Restarts,Params"
    status = [header]

    for key in self.status_subtree:
        value = self.values[key]

        jobid_str = "--"
        if value.jobid:
            jobid_str = str(value.jobid[-1])

        # Include step root in workspace when parameterized
        if list(value.params.items()):
            ws = os.path.join(
                * os.path.normpath(
                    value.workspace.value).split(os.sep)[-2:]
            )
        else:
            ws = os.path.split(value.workspace.value)[1]

        _ = [
                value.name, jobid_str,
                ws,
                str(value.status.name), value.run_time, value.elapsed_time,
                value.time_start, value.time_submitted, value.time_end,
                str(value.restarts),
                ";".join(["{}:{}".format(param, value)
                          for param, value in value.params.items()])
            ]
        _ = ",".join(_)
        status.append(_)

    stat_path = os.path.join(path, "status.csv")
    lock_path = os.path.join(path, ".status.lock")
    lock = FileLock(lock_path)
    try:
        with lock.acquire(timeout=10):
            with open(stat_path, "w+") as stat_file:
                stat_file.write("\n".join(status))
    except Timeout:
        pass