Skip to content

Executiongraph

Module for the execution of DAG workflows.

ExecutionGraph

Bases: DAG, PickleInterface

Datastructure that tracks, executes, and reports on study execution.

The ExecutionGraph is used to manage, monitor, and interact with tasks and the scheduler. This class searches its graph for tasks that are ready to run, marks tasks as complete, and schedules ready tasks.

The Execution class is where functionality for checking task status, logic for managing and automatically directing and manipulating the workflow should go. Essentially, if logic is needed to automatically manipulate the workflow in some fashion or additional monitoring is needed, this class is where that would go.

Source code in maestrowf/datastructures/core/executiongraph.py
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
class ExecutionGraph(DAG, PickleInterface):
    """
    Datastructure that tracks, executes, and reports on study execution.

    The ExecutionGraph is used to manage, monitor, and interact with tasks and
    the scheduler. This class searches its graph for tasks that are ready to
    run, marks tasks as complete, and schedules ready tasks.

    The Execution class is where functionality for checking task status, logic
    for managing and automatically directing and manipulating the workflow
    should go. Essentially, if logic is needed to automatically manipulate the
    workflow in some fashion or additional monitoring is needed, this class is
    where that would go.
    """

    def __init__(self, submission_attempts=1, submission_throttle=0,
                 use_tmp=False, dry_run=False):
        """
        Initialize a new instance of an ExecutionGraph.

        :param submission_attempts: Number of attempted submissions before
            marking a step as failed.
        :param submission_throttle: Maximum number of scheduled in progress
        submissions.
        :param use_tmp: A Boolean value that when set to 'True' designates
        that ExecutionGraph should use temporary files for output.
        """
        super(ExecutionGraph, self).__init__()
        # Member variables for execution.
        self._adapter = None
        self._description = OrderedDict()

        # Generate tempdir (if specfied)
        if use_tmp:
            self._tmp_dir = tempfile.mkdtemp()
        else:
            self._tmp_dir = ""

        # Sets to track progress.
        self.completed_steps = set([SOURCE])
        self.in_progress = set()
        self.failed_steps = set()
        self.cancelled_steps = set()
        self.ready_steps = deque()
        self.is_canceled = False

        self._status_order = 'bfs'  # Set status order type
        self._status_subtree = None  # Cache bfs_subtree for status writing

        # Values for management of the DAG. Things like submission attempts,
        # throttling, etc. should be listed here.
        self._submission_attempts = submission_attempts
        self._submission_throttle = submission_throttle
        self.dry_run = dry_run

        # A map that tracks the dependencies of a step.
        # NOTE: I don't know how performant the Python dict structure is, but
        # we'll use it for now. I think this may want to be changed to an AVL
        # tree or something of that nature to guarantee worst case performance.
        self._dependencies = {}

        LOGGER.info(
            "\n------------------------------------------\n"
            "Submission attempts =       %d\n"
            "Submission throttle limit = %d\n"
            "Use temporary directory =   %s\n"
            "Tmp Dir = %s\n"
            "------------------------------------------",
            submission_attempts, submission_throttle, use_tmp, self._tmp_dir
        )

        # Error check that the submission values are valid.
        if self._submission_attempts < 1:
            _msg = "Submission attempts should always be greater than 0. " \
                   "Received a value of {}.".format(self._submission_attempts)
            LOGGER.error(_msg)
            raise ValueError(_msg)

        if self._submission_throttle < 0:
            _msg = "Throttling should be 0 for unthrottled or a positive " \
                   "integer for the number of allowed inflight jobs. " \
                   "Received a value of {}.".format(self._submission_throttle)
            LOGGER.error(_msg)
            raise ValueError(_msg)

    def _check_tmp_dir(self):
        """Check and recreate the tempdir should it have been erased."""
        # If we've specified a tmp dir and the previous tmp dir doesn't exist
        # recreate it.
        if self._tmp_dir and not os.path.exists(self._tmp_dir):
            self._tmp_dir = tempfile.mkdtemp()

    def add_step(self, name, step, workspace, restart_limit, params=None):
        """
        Add a StepRecord to the ExecutionGraph.

        :param name: Name of the step to be added.
        :param step: StudyStep instance to be recorded.
        :param workspace: Directory path for the step's working directory.
        :param restart_limit: Upper limit on the number of restart attempts.
        :param params: Iterable of tuples of step parameter names, values
        """
        data = {
                    "step":          step,
                    "state":         State.INITIALIZED,
                    "workspace":     workspace,
                    "restart_limit": restart_limit,
                }
        record = _StepRecord(**data)
        if params:
            record.add_params(params)

        self._dependencies[name] = set()
        super(ExecutionGraph, self).add_node(name, record)

    def add_connection(self, parent, step):
        """
        Add a connection between two steps in the ExecutionGraph.

        :param parent: The parent step that is required to execute 'step'
        :param step: The dependent step that relies on parent.
        """
        self.add_edge(parent, step)
        self._dependencies[step].add(parent)

    def set_adapter(self, adapter):
        """
        Set the adapter used to interface for scheduling tasks.

        :param adapter: Adapter name to be used when launching the graph.
        """
        if not adapter:
            # If we have no adapter specified, assume sequential execution.
            self._adapter = None
            return

        if not isinstance(adapter, dict):
            msg = "Adapter settings must be contained in a dictionary."
            LOGGER.error(msg)
            raise TypeError(msg)

        # Check to see that the adapter type is something the
        if adapter["type"] not in ScriptAdapterFactory.get_valid_adapters():
            msg = "'{}' adapter must be specfied in ScriptAdapterFactory." \
                  .format(adapter)
            LOGGER.error(msg)
            raise TypeError(msg)

        self._adapter = adapter

    def add_description(self, name, description, **kwargs):
        """
        Add a study description to the ExecutionGraph instance.

        :param name: Name of the study.
        :param description: Description of the study.
        """
        self._description["name"] = name
        self._description["description"] = description
        self._description.update(kwargs)

    @property
    def name(self):
        """
        Return the name for the study in the ExecutionGraph instance.

        :returns: A string of the name of the study.
        """
        return self._description["name"]

    @name.setter
    def name(self, value):
        """
        Set the name for the study in the ExecutionGraph instance.

        :param name: A string of the name for the study.
        """
        self._description["name"] = value

    @property
    def description(self):
        """
        Return the description for the study in the ExecutionGraph instance.

        :returns: A string of the description for the study.
        """
        return self._description["description"]

    @description.setter
    def description(self, value):
        """
        Set the description for the study in the ExecutionGraph instance.

        :param value: A string of the description for the study.
        """
        self._description["description"] = value

    def log_description(self):
        """Log the description of the ExecutionGraph."""
        desc = ["{}: {}".format(key, value)
                for key, value in self._description.items()]
        desc = "\n".join(desc)
        LOGGER.info(
            "\n==================================================\n"
            "%s\n"
            "==================================================\n",
            desc
        )

    def generate_scripts(self):
        """
        Generate the scripts for all steps in the ExecutionGraph.

        The generate_scripts method scans the ExecutionGraph instance and uses
        the stored adapter to write executable scripts for either local or
        scheduled execution. If a restart command is specified, a restart
        script will be generated for that record.
        """
        # An adapter must be specified
        if not self._adapter:
            msg = "Adapter not found. Specify a ScriptAdapter using " \
                  "set_adapter."
            LOGGER.error(msg)
            raise ValueError(msg)

        # Set up the adapter.
        LOGGER.info("Generating scripts...")
        adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
        adapter = adapter(**self._adapter)

        self._check_tmp_dir()
        for key, record in self.values.items():
            if key == SOURCE:
                continue

            # Record generates its own script.
            record.setup_workspace()
            record.generate_script(adapter, self._tmp_dir)

    def _execute_record(self, record, adapter, restart=False):
        """
        Execute a StepRecord.

        :param record: The StepRecord to be executed.
        :param adapter: An instance of the adapter to be used for cluster
        submission.
        :param restart: True if the record needs restarting, False otherwise.
        """
        # Logging for debugging.
        LOGGER.info("Calling execute for StepRecord '%s'", record.name)

        num_restarts = 0    # Times this step has temporally restarted.
        retcode = None      # Execution return code.

        # While our submission needs to be submitted, keep trying:
        # 1. If the JobStatus is not OK.
        # 2. num_restarts is less than self._submission_attempts
        self._check_tmp_dir()

        # Only set up the workspace the initial iteration.
        if not restart:
            LOGGER.debug("Setting up workspace for '%s' at %s",
                         record.name, str(datetime.now()))
            # Generate the script for execution on the fly.
            record.setup_workspace()    # Generate the workspace.
            record.generate_script(adapter, self._tmp_dir)

        if self.dry_run:
            record.mark_end(State.DRYRUN)
            self.completed_steps.add(record.name)
            return

        while retcode != SubmissionCode.OK and \
                num_restarts < self._submission_attempts:
            LOGGER.info("Attempting submission of '%s' (attempt %d of %d)...",
                        record.name, num_restarts + 1,
                        self._submission_attempts)

            # We're not restarting -- submit as usual.
            if not restart:
                LOGGER.debug("Calling 'execute' on '%s' at %s",
                             record.name, str(datetime.now()))
                retcode = record.execute(adapter)
            # Otherwise, it's a restart.
            else:
                # If the restart is specified, use the record restart script.
                LOGGER.debug("Calling 'restart' on '%s' at %s",
                             record.name, str(datetime.now()))
                # Generate the script for execution on the fly.
                record.generate_script(adapter, self._tmp_dir)
                retcode = record.restart(adapter)

            # Increment the number of restarts we've attempted.
            LOGGER.debug("Completed submission attempt %d", num_restarts)
            num_restarts += 1
            sleep((random.random() + 1) * num_restarts)

        if retcode == SubmissionCode.OK:
            self.in_progress.add(record.name)

            if record.is_local_step:
                LOGGER.info("Local step %s executed with status OK. Complete.",
                            record.name)
                record.mark_end(State.FINISHED)
                self.completed_steps.add(record.name)
                self.in_progress.remove(record.name)
        else:
            # Find the subtree, because anything dependent on this step now
            # failed.
            LOGGER.warning("'%s' failed to submit properly. "
                           "Step failed.", record.name)
            path, parent = self.bfs_subtree(record.name)
            for node in path:
                self.failed_steps.add(node)
                self.values[node].mark_end(State.FAILED)

        # After execution state debug logging.
        LOGGER.debug("After execution of '%s' -- New state is %s.",
                     record.name, record.status)

    @property
    def status_subtree(self):
        """Cache the status ordering to improve scaling"""
        if not self._status_subtree:
            if self._status_order == 'bfs':
                subtree, _ = self.bfs_subtree("_source")

            elif self._status_order == 'dfs':
                subtree, _ = self.dfs_subtree("_source", par="_source")

            self._status_subtree = [key for key in subtree
                                    if key != '_source']

        return self._status_subtree

    def write_status(self, path):
        """Write the status of the DAG to a CSV file."""
        header = "Step Name,Job ID,Workspace,State,Run Time,Elapsed Time," \
                 "Start Time,Submit Time,End Time,Number Restarts,Params"
        status = [header]

        for key in self.status_subtree:
            value = self.values[key]

            jobid_str = "--"
            if value.jobid:
                jobid_str = str(value.jobid[-1])

            # Include step root in workspace when parameterized
            if list(value.params.items()):
                ws = os.path.join(
                    * os.path.normpath(
                        value.workspace.value).split(os.sep)[-2:]
                )
            else:
                ws = os.path.split(value.workspace.value)[1]

            _ = [
                    value.name, jobid_str,
                    ws,
                    str(value.status.name), value.run_time, value.elapsed_time,
                    value.time_start, value.time_submitted, value.time_end,
                    str(value.restarts),
                    ";".join(["{}:{}".format(param, value)
                              for param, value in value.params.items()])
                ]
            _ = ",".join(_)
            status.append(_)

        stat_path = os.path.join(path, "status.csv")
        lock_path = os.path.join(path, ".status.lock")
        lock = FileLock(lock_path)
        try:
            with lock.acquire(timeout=10):
                with open(stat_path, "w+") as stat_file:
                    stat_file.write("\n".join(status))
        except Timeout:
            pass

    def _check_study_completion(self):
        # We cancelled, return True marking study as complete.
        if self.is_canceled and not self.in_progress:
            LOGGER.info("Cancelled -- completing study.")
            return StudyStatus.CANCELLED

        # check for completion of all steps
        resolved_set = \
            self.completed_steps | self.failed_steps | self.cancelled_steps
        if not set(self.values.keys()) - resolved_set:
            # some steps were cancelled and is_canceled wasn't set
            if len(self.cancelled_steps) > 0:
                logging.info("'%s' was cancelled. Returning.", self.name)
                return StudyStatus.CANCELLED

            # some steps were failures indicating failure
            if len(self.failed_steps) > 0:
                logging.info("'%s' is complete with failures. Returning.",
                             self.name)
                return StudyStatus.FAILURE

            # everything completed were are done
            logging.info("'%s' is complete. Returning.", self.name)
            return StudyStatus.FINISHED

        return StudyStatus.RUNNING

    def execute_ready_steps(self):
        """
        Execute any steps whose dependencies are satisfied.

        The 'execute_ready_steps' method is the core of how the ExecutionGraph
        manages execution. This method does the following:

        * Checks the status of existing jobs that are executing and updates
          the state if changed.
        * Finds steps that are initialized and determines what can be run
          based on satisfied dependencies and executes steps whose
          dependencies are met.

        :returns: True if the study has completed, False otherwise.
        """
        # TODO: We may want to move this to a singleton somewhere
        # so we can guarantee that all steps use the same adapter.
        adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
        adapter = adapter(**self._adapter)

        if not self.dry_run:
            LOGGER.debug("Checking status check...")
            retcode, job_status = self.check_study_status()
        else:
            LOGGER.debug("DRYRUN: Skipping status check...")
            retcode = JobStatusCode.OK
            job_status = {}

        LOGGER.debug("Checked status (retcode %s)-- %s", retcode, job_status)

        # For now, if we can't check the status something is wrong.
        # Don't modify the DAG.
        if retcode == JobStatusCode.ERROR:
            msg = "Job status check failed -- Aborting."
            LOGGER.error(msg)
            raise RuntimeError(msg)
        elif retcode == JobStatusCode.OK:
            # For the status of each currently in progress job, check its
            # state.
            cleanup_steps = set()  # Steps that are in progress showing failed.
            cancel_steps = set()   # Steps that have dependencies to mark cancelled
            for name, status in job_status.items():
                LOGGER.debug("Checking job '%s' with status %s.", name, status)
                record = self.values[name]

                if status == State.FINISHED:
                    # Mark the step complete and notate its end time.
                    record.mark_end(State.FINISHED)
                    LOGGER.info("Step '%s' marked as finished. Adding to "
                                "complete set.", name)
                    self.completed_steps.add(name)
                    self.in_progress.remove(name)

                elif status == State.RUNNING:
                    # When detect that a step is running, mark it.
                    LOGGER.info("Step '%s' found to be running.", record.name)
                    record.mark_running()

                elif status == State.TIMEDOUT:
                    # Execute the restart script.
                    # If a restart script doesn't exist, re-run the command.
                    # If we're under the restart limit, attempt a restart.
                    if record.can_restart:
                        if record.mark_restart():
                            LOGGER.info(
                                "Step '%s' timed out. Restarting (%s of %s).",
                                name, record.restarts, record.restart_limit
                            )
                            self._execute_record(record, adapter, restart=True)
                        else:
                            LOGGER.info("'%s' has been restarted %s of %s "
                                        "times. Marking step and all "
                                        "descendents as failed.",
                                        name,
                                        record.restarts,
                                        record.restart_limit)
                            self.in_progress.remove(name)
                            cleanup_steps.update(self.bfs_subtree(name)[0])
                    # Otherwise, we can't restart so mark the step timed out.
                    else:
                        LOGGER.info("'%s' timed out, but cannot be restarted."
                                    " Marked as TIMEDOUT.", name)
                        # Mark that the step ended due to TIMEOUT.
                        record.mark_end(State.TIMEDOUT)
                        # Remove from in progress since it no longer is.
                        self.in_progress.remove(name)
                        # Add the subtree to the clean up steps
                        cleanup_steps.update(self.bfs_subtree(name)[0])
                        # Remove the current step, clean up is used to mark
                        # steps definitively as failed.
                        cleanup_steps.remove(name)
                        # Add the current step to failed.
                        self.failed_steps.add(name)

                elif status == State.HWFAILURE:
                    # TODO: Need to make sure that we do this a finite number
                    # of times.
                    # Resubmit the cmd.
                    LOGGER.warning("Hardware failure detected. Attempting to "
                                   "resubmit step '%s'.", name)
                    # We can just let the logic below handle submission with
                    # everything else.
                    self.ready_steps.append(name)

                elif status == State.FAILED:
                    LOGGER.warning(
                        "Job failure reported. Aborting %s -- flagging all "
                        "dependent jobs as failed.",
                        name
                    )
                    self.in_progress.remove(name)
                    record.mark_end(State.FAILED)
                    cleanup_steps.update(self.bfs_subtree(name)[0])

                elif status == State.UNKNOWN:
                    record.mark_end(State.UNKNOWN)
                    LOGGER.info(
                        "Step '%s' found in UNKNOWN state. Step was found "
                        "in '%s' state previously, marking as UNKNOWN. "
                        "Adding to failed steps.",
                        name, record.status)
                    cleanup_steps.update(self.bfs_subtree(name)[0])
                    self.in_progress.remove(name)

                elif status == State.CANCELLED:
                    LOGGER.info("Step '%s' was cancelled.", name)
                    self.in_progress.remove(name)
                    record.mark_end(State.CANCELLED)
                    cancel_steps.update(self.bfs_subtree(name)[0])

            # Let's handle all the failed steps in one go.
            for node in cleanup_steps:
                self.failed_steps.add(node)
                self.values[node].mark_end(State.FAILED)

            # Handle dependent steps that need cancelling
            for node in cancel_steps:
                self.cancelled_steps.add(node)
                self.values[node].mark_end(State.CANCELLED)

        # Now that we've checked the statuses of existing jobs we need to make
        # sure dependencies haven't been met.
        for key in self.values.keys():
            # We MUST dereference from the key. If we use values.items(), a
            # generator gets produced which will give us a COPY of a record and
            # not the actual record.
            record = self.values[key]

            # A completed step by definition has had its dependencies met.
            # Skip it.
            if key in self.completed_steps:
                LOGGER.debug("'%s' in completed set, skipping.", key)
                continue

            LOGGER.debug("Checking %s -- %s", key, record.jobid)
            # If the record is only INITIALIZED, we have encountered a step
            # that needs consideration.
            if record.status == State.INITIALIZED:
                LOGGER.debug("'%s' found to be initialized. Checking "
                             "dependencies. ", key)

                LOGGER.debug(
                    "Unfulfilled dependencies: %s",
                    self._dependencies[key])

                s_completed = list(filter(
                    lambda x: x in self.completed_steps,
                    self._dependencies[key]))
                self._dependencies[key] = \
                    self._dependencies[key] - set(s_completed)
                LOGGER.debug(
                    "Completed dependencies: %s\n"
                    "Remaining dependencies: %s",
                    s_completed, self._dependencies[key])

                # If the gating dependencies set is empty, we can execute.
                if not self._dependencies[key]:
                    if key not in self.ready_steps:
                        LOGGER.debug("All dependencies completed. Staging.")
                        self.ready_steps.append(key)
                    else:
                        LOGGER.debug("Already staged. Passing.")
                        continue

        # We now have a collection of ready steps. Execute.
        # If we don't have a submission limit, go ahead and submit all.
        if self._submission_throttle == 0:
            LOGGER.info("Launching all ready steps...")
            _available = len(self.ready_steps)
        # Else, we have a limit -- adhere to it.
        else:
            # Compute the number of available slots we have for execution.
            _available = self._submission_throttle - len(self.in_progress)
            # Available slots should never be negative, but on the off chance
            # we are in a slot deficit, then we will just say none are free.
            _available = max(0, _available)
            # Now, we need to take the min of the length of the queue and the
            # computed number of slots. We could have free slots, but have less
            # in the queue.
            _available = min(_available, len(self.ready_steps))
            LOGGER.info("Found %d available slots...", _available)

        for i in range(0, _available):
            # Pop the record and execute using the helper method.
            _record = self.values[self.ready_steps.popleft()]

            # If we get to this point and we've cancelled, cancel the record.
            if self.is_canceled:
                LOGGER.info("Cancelling '%s' -- continuing.", _record.name)
                _record.mark_end(State.CANCELLED)
                self.cancelled_steps.add(_record.name)
                continue

            LOGGER.debug("Launching job %d -- %s", i, _record.name)
            self._execute_record(_record, adapter)

        # check the status of the study upon finishing this round of execution
        completion_status = self._check_study_completion()
        return completion_status

    def check_study_status(self):
        """
        Check the status of currently executing steps in the graph.

        This method is used to check the status of all currently in progress
        steps in the ExecutionGraph. Each ExecutionGraph stores the adapter
        used to generate and execute its scripts.
        """
        # Set up the job list and the map to get back to step names.
        joblist = []
        jobmap = {}
        for step in self.in_progress:
            jobid = self.values[step].jobid[-1]
            joblist.append(jobid)
            jobmap[jobid] = step

        # Grab the adapter from the ScriptAdapterFactory.
        adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
        adapter = adapter(**self._adapter)
        # Use the adapter to grab the job statuses.
        retcode, job_status = adapter.check_jobs(joblist)
        # Map the job identifiers back to step names.
        step_status = {jobmap[jobid]: status
                       for jobid, status in job_status.items()}

        # Based on return code, log something different.
        if retcode == JobStatusCode.OK:
            LOGGER.info("Jobs found for user '%s'.", getpass.getuser())
            return retcode, step_status
        elif retcode == JobStatusCode.NOJOBS:
            LOGGER.info("No jobs found.")
            return retcode, step_status
        else:
            msg = "Unknown Error (Code = {})".format(retcode)
            LOGGER.error(msg)
            return retcode, step_status

    def cancel_study(self):
        """Cancel the study."""
        joblist = []
        for step in self.in_progress:
            jobid = self.values[step].jobid[-1]
            joblist.append(jobid)

        # Grab the adapter from the ScriptAdapterFactory.
        adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
        adapter = adapter(**self._adapter)

        # cancel our jobs
        crecord = adapter.cancel_jobs(joblist)
        self.is_canceled = True

        if crecord.cancel_status == CancelCode.OK:
            LOGGER.info("Successfully requested to cancel all jobs.")
        elif crecord.cancel_status == CancelCode.ERROR:
            LOGGER.error(
                "Failed to cancel jobs. (Code = %s)", crecord.return_code)
        else:
            LOGGER.error("Unknown Error (Code = %s)", crecord.return_code)

        return crecord.cancel_status

    def cleanup(self):
        """Clean up output produced by the ExecutionGraph."""
        if self._tmp_dir:
            shutil.rmtree(self._tmp_dir, ignore_errors=True)

description writable property

Return the description for the study in the ExecutionGraph instance.

Returns:

Type Description

A string of the description for the study.

name writable property

Return the name for the study in the ExecutionGraph instance.

Returns:

Type Description

A string of the name of the study.

status_subtree property

Cache the status ordering to improve scaling

__init__(submission_attempts=1, submission_throttle=0, use_tmp=False, dry_run=False)

Initialize a new instance of an ExecutionGraph.

Parameters:

Name Type Description Default
submission_attempts

Number of attempted submissions before marking a step as failed.

1
submission_throttle

Maximum number of scheduled in progress submissions.

0
use_tmp

A Boolean value that when set to 'True' designates that ExecutionGraph should use temporary files for output.

False
Source code in maestrowf/datastructures/core/executiongraph.py
def __init__(self, submission_attempts=1, submission_throttle=0,
             use_tmp=False, dry_run=False):
    """
    Initialize a new instance of an ExecutionGraph.

    :param submission_attempts: Number of attempted submissions before
        marking a step as failed.
    :param submission_throttle: Maximum number of scheduled in progress
    submissions.
    :param use_tmp: A Boolean value that when set to 'True' designates
    that ExecutionGraph should use temporary files for output.
    """
    super(ExecutionGraph, self).__init__()
    # Member variables for execution.
    self._adapter = None
    self._description = OrderedDict()

    # Generate tempdir (if specfied)
    if use_tmp:
        self._tmp_dir = tempfile.mkdtemp()
    else:
        self._tmp_dir = ""

    # Sets to track progress.
    self.completed_steps = set([SOURCE])
    self.in_progress = set()
    self.failed_steps = set()
    self.cancelled_steps = set()
    self.ready_steps = deque()
    self.is_canceled = False

    self._status_order = 'bfs'  # Set status order type
    self._status_subtree = None  # Cache bfs_subtree for status writing

    # Values for management of the DAG. Things like submission attempts,
    # throttling, etc. should be listed here.
    self._submission_attempts = submission_attempts
    self._submission_throttle = submission_throttle
    self.dry_run = dry_run

    # A map that tracks the dependencies of a step.
    # NOTE: I don't know how performant the Python dict structure is, but
    # we'll use it for now. I think this may want to be changed to an AVL
    # tree or something of that nature to guarantee worst case performance.
    self._dependencies = {}

    LOGGER.info(
        "\n------------------------------------------\n"
        "Submission attempts =       %d\n"
        "Submission throttle limit = %d\n"
        "Use temporary directory =   %s\n"
        "Tmp Dir = %s\n"
        "------------------------------------------",
        submission_attempts, submission_throttle, use_tmp, self._tmp_dir
    )

    # Error check that the submission values are valid.
    if self._submission_attempts < 1:
        _msg = "Submission attempts should always be greater than 0. " \
               "Received a value of {}.".format(self._submission_attempts)
        LOGGER.error(_msg)
        raise ValueError(_msg)

    if self._submission_throttle < 0:
        _msg = "Throttling should be 0 for unthrottled or a positive " \
               "integer for the number of allowed inflight jobs. " \
               "Received a value of {}.".format(self._submission_throttle)
        LOGGER.error(_msg)
        raise ValueError(_msg)

add_connection(parent, step)

Add a connection between two steps in the ExecutionGraph.

Parameters:

Name Type Description Default
parent

The parent step that is required to execute 'step'

required
step

The dependent step that relies on parent.

required
Source code in maestrowf/datastructures/core/executiongraph.py
def add_connection(self, parent, step):
    """
    Add a connection between two steps in the ExecutionGraph.

    :param parent: The parent step that is required to execute 'step'
    :param step: The dependent step that relies on parent.
    """
    self.add_edge(parent, step)
    self._dependencies[step].add(parent)

add_description(name, description, **kwargs)

Add a study description to the ExecutionGraph instance.

Parameters:

Name Type Description Default
name

Name of the study.

required
description

Description of the study.

required
Source code in maestrowf/datastructures/core/executiongraph.py
def add_description(self, name, description, **kwargs):
    """
    Add a study description to the ExecutionGraph instance.

    :param name: Name of the study.
    :param description: Description of the study.
    """
    self._description["name"] = name
    self._description["description"] = description
    self._description.update(kwargs)

add_step(name, step, workspace, restart_limit, params=None)

Add a StepRecord to the ExecutionGraph.

Parameters:

Name Type Description Default
name

Name of the step to be added.

required
step

StudyStep instance to be recorded.

required
workspace

Directory path for the step's working directory.

required
restart_limit

Upper limit on the number of restart attempts.

required
params

Iterable of tuples of step parameter names, values

None
Source code in maestrowf/datastructures/core/executiongraph.py
def add_step(self, name, step, workspace, restart_limit, params=None):
    """
    Add a StepRecord to the ExecutionGraph.

    :param name: Name of the step to be added.
    :param step: StudyStep instance to be recorded.
    :param workspace: Directory path for the step's working directory.
    :param restart_limit: Upper limit on the number of restart attempts.
    :param params: Iterable of tuples of step parameter names, values
    """
    data = {
                "step":          step,
                "state":         State.INITIALIZED,
                "workspace":     workspace,
                "restart_limit": restart_limit,
            }
    record = _StepRecord(**data)
    if params:
        record.add_params(params)

    self._dependencies[name] = set()
    super(ExecutionGraph, self).add_node(name, record)

cancel_study()

Cancel the study.

Source code in maestrowf/datastructures/core/executiongraph.py
def cancel_study(self):
    """Cancel the study."""
    joblist = []
    for step in self.in_progress:
        jobid = self.values[step].jobid[-1]
        joblist.append(jobid)

    # Grab the adapter from the ScriptAdapterFactory.
    adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
    adapter = adapter(**self._adapter)

    # cancel our jobs
    crecord = adapter.cancel_jobs(joblist)
    self.is_canceled = True

    if crecord.cancel_status == CancelCode.OK:
        LOGGER.info("Successfully requested to cancel all jobs.")
    elif crecord.cancel_status == CancelCode.ERROR:
        LOGGER.error(
            "Failed to cancel jobs. (Code = %s)", crecord.return_code)
    else:
        LOGGER.error("Unknown Error (Code = %s)", crecord.return_code)

    return crecord.cancel_status

check_study_status()

Check the status of currently executing steps in the graph.

This method is used to check the status of all currently in progress steps in the ExecutionGraph. Each ExecutionGraph stores the adapter used to generate and execute its scripts.

Source code in maestrowf/datastructures/core/executiongraph.py
def check_study_status(self):
    """
    Check the status of currently executing steps in the graph.

    This method is used to check the status of all currently in progress
    steps in the ExecutionGraph. Each ExecutionGraph stores the adapter
    used to generate and execute its scripts.
    """
    # Set up the job list and the map to get back to step names.
    joblist = []
    jobmap = {}
    for step in self.in_progress:
        jobid = self.values[step].jobid[-1]
        joblist.append(jobid)
        jobmap[jobid] = step

    # Grab the adapter from the ScriptAdapterFactory.
    adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
    adapter = adapter(**self._adapter)
    # Use the adapter to grab the job statuses.
    retcode, job_status = adapter.check_jobs(joblist)
    # Map the job identifiers back to step names.
    step_status = {jobmap[jobid]: status
                   for jobid, status in job_status.items()}

    # Based on return code, log something different.
    if retcode == JobStatusCode.OK:
        LOGGER.info("Jobs found for user '%s'.", getpass.getuser())
        return retcode, step_status
    elif retcode == JobStatusCode.NOJOBS:
        LOGGER.info("No jobs found.")
        return retcode, step_status
    else:
        msg = "Unknown Error (Code = {})".format(retcode)
        LOGGER.error(msg)
        return retcode, step_status

cleanup()

Clean up output produced by the ExecutionGraph.

Source code in maestrowf/datastructures/core/executiongraph.py
def cleanup(self):
    """Clean up output produced by the ExecutionGraph."""
    if self._tmp_dir:
        shutil.rmtree(self._tmp_dir, ignore_errors=True)

execute_ready_steps()

Execute any steps whose dependencies are satisfied.

The 'execute_ready_steps' method is the core of how the ExecutionGraph manages execution. This method does the following:

  • Checks the status of existing jobs that are executing and updates the state if changed.
  • Finds steps that are initialized and determines what can be run based on satisfied dependencies and executes steps whose dependencies are met.

Returns:

Type Description

True if the study has completed, False otherwise.

Source code in maestrowf/datastructures/core/executiongraph.py
def execute_ready_steps(self):
    """
    Execute any steps whose dependencies are satisfied.

    The 'execute_ready_steps' method is the core of how the ExecutionGraph
    manages execution. This method does the following:

    * Checks the status of existing jobs that are executing and updates
      the state if changed.
    * Finds steps that are initialized and determines what can be run
      based on satisfied dependencies and executes steps whose
      dependencies are met.

    :returns: True if the study has completed, False otherwise.
    """
    # TODO: We may want to move this to a singleton somewhere
    # so we can guarantee that all steps use the same adapter.
    adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
    adapter = adapter(**self._adapter)

    if not self.dry_run:
        LOGGER.debug("Checking status check...")
        retcode, job_status = self.check_study_status()
    else:
        LOGGER.debug("DRYRUN: Skipping status check...")
        retcode = JobStatusCode.OK
        job_status = {}

    LOGGER.debug("Checked status (retcode %s)-- %s", retcode, job_status)

    # For now, if we can't check the status something is wrong.
    # Don't modify the DAG.
    if retcode == JobStatusCode.ERROR:
        msg = "Job status check failed -- Aborting."
        LOGGER.error(msg)
        raise RuntimeError(msg)
    elif retcode == JobStatusCode.OK:
        # For the status of each currently in progress job, check its
        # state.
        cleanup_steps = set()  # Steps that are in progress showing failed.
        cancel_steps = set()   # Steps that have dependencies to mark cancelled
        for name, status in job_status.items():
            LOGGER.debug("Checking job '%s' with status %s.", name, status)
            record = self.values[name]

            if status == State.FINISHED:
                # Mark the step complete and notate its end time.
                record.mark_end(State.FINISHED)
                LOGGER.info("Step '%s' marked as finished. Adding to "
                            "complete set.", name)
                self.completed_steps.add(name)
                self.in_progress.remove(name)

            elif status == State.RUNNING:
                # When detect that a step is running, mark it.
                LOGGER.info("Step '%s' found to be running.", record.name)
                record.mark_running()

            elif status == State.TIMEDOUT:
                # Execute the restart script.
                # If a restart script doesn't exist, re-run the command.
                # If we're under the restart limit, attempt a restart.
                if record.can_restart:
                    if record.mark_restart():
                        LOGGER.info(
                            "Step '%s' timed out. Restarting (%s of %s).",
                            name, record.restarts, record.restart_limit
                        )
                        self._execute_record(record, adapter, restart=True)
                    else:
                        LOGGER.info("'%s' has been restarted %s of %s "
                                    "times. Marking step and all "
                                    "descendents as failed.",
                                    name,
                                    record.restarts,
                                    record.restart_limit)
                        self.in_progress.remove(name)
                        cleanup_steps.update(self.bfs_subtree(name)[0])
                # Otherwise, we can't restart so mark the step timed out.
                else:
                    LOGGER.info("'%s' timed out, but cannot be restarted."
                                " Marked as TIMEDOUT.", name)
                    # Mark that the step ended due to TIMEOUT.
                    record.mark_end(State.TIMEDOUT)
                    # Remove from in progress since it no longer is.
                    self.in_progress.remove(name)
                    # Add the subtree to the clean up steps
                    cleanup_steps.update(self.bfs_subtree(name)[0])
                    # Remove the current step, clean up is used to mark
                    # steps definitively as failed.
                    cleanup_steps.remove(name)
                    # Add the current step to failed.
                    self.failed_steps.add(name)

            elif status == State.HWFAILURE:
                # TODO: Need to make sure that we do this a finite number
                # of times.
                # Resubmit the cmd.
                LOGGER.warning("Hardware failure detected. Attempting to "
                               "resubmit step '%s'.", name)
                # We can just let the logic below handle submission with
                # everything else.
                self.ready_steps.append(name)

            elif status == State.FAILED:
                LOGGER.warning(
                    "Job failure reported. Aborting %s -- flagging all "
                    "dependent jobs as failed.",
                    name
                )
                self.in_progress.remove(name)
                record.mark_end(State.FAILED)
                cleanup_steps.update(self.bfs_subtree(name)[0])

            elif status == State.UNKNOWN:
                record.mark_end(State.UNKNOWN)
                LOGGER.info(
                    "Step '%s' found in UNKNOWN state. Step was found "
                    "in '%s' state previously, marking as UNKNOWN. "
                    "Adding to failed steps.",
                    name, record.status)
                cleanup_steps.update(self.bfs_subtree(name)[0])
                self.in_progress.remove(name)

            elif status == State.CANCELLED:
                LOGGER.info("Step '%s' was cancelled.", name)
                self.in_progress.remove(name)
                record.mark_end(State.CANCELLED)
                cancel_steps.update(self.bfs_subtree(name)[0])

        # Let's handle all the failed steps in one go.
        for node in cleanup_steps:
            self.failed_steps.add(node)
            self.values[node].mark_end(State.FAILED)

        # Handle dependent steps that need cancelling
        for node in cancel_steps:
            self.cancelled_steps.add(node)
            self.values[node].mark_end(State.CANCELLED)

    # Now that we've checked the statuses of existing jobs we need to make
    # sure dependencies haven't been met.
    for key in self.values.keys():
        # We MUST dereference from the key. If we use values.items(), a
        # generator gets produced which will give us a COPY of a record and
        # not the actual record.
        record = self.values[key]

        # A completed step by definition has had its dependencies met.
        # Skip it.
        if key in self.completed_steps:
            LOGGER.debug("'%s' in completed set, skipping.", key)
            continue

        LOGGER.debug("Checking %s -- %s", key, record.jobid)
        # If the record is only INITIALIZED, we have encountered a step
        # that needs consideration.
        if record.status == State.INITIALIZED:
            LOGGER.debug("'%s' found to be initialized. Checking "
                         "dependencies. ", key)

            LOGGER.debug(
                "Unfulfilled dependencies: %s",
                self._dependencies[key])

            s_completed = list(filter(
                lambda x: x in self.completed_steps,
                self._dependencies[key]))
            self._dependencies[key] = \
                self._dependencies[key] - set(s_completed)
            LOGGER.debug(
                "Completed dependencies: %s\n"
                "Remaining dependencies: %s",
                s_completed, self._dependencies[key])

            # If the gating dependencies set is empty, we can execute.
            if not self._dependencies[key]:
                if key not in self.ready_steps:
                    LOGGER.debug("All dependencies completed. Staging.")
                    self.ready_steps.append(key)
                else:
                    LOGGER.debug("Already staged. Passing.")
                    continue

    # We now have a collection of ready steps. Execute.
    # If we don't have a submission limit, go ahead and submit all.
    if self._submission_throttle == 0:
        LOGGER.info("Launching all ready steps...")
        _available = len(self.ready_steps)
    # Else, we have a limit -- adhere to it.
    else:
        # Compute the number of available slots we have for execution.
        _available = self._submission_throttle - len(self.in_progress)
        # Available slots should never be negative, but on the off chance
        # we are in a slot deficit, then we will just say none are free.
        _available = max(0, _available)
        # Now, we need to take the min of the length of the queue and the
        # computed number of slots. We could have free slots, but have less
        # in the queue.
        _available = min(_available, len(self.ready_steps))
        LOGGER.info("Found %d available slots...", _available)

    for i in range(0, _available):
        # Pop the record and execute using the helper method.
        _record = self.values[self.ready_steps.popleft()]

        # If we get to this point and we've cancelled, cancel the record.
        if self.is_canceled:
            LOGGER.info("Cancelling '%s' -- continuing.", _record.name)
            _record.mark_end(State.CANCELLED)
            self.cancelled_steps.add(_record.name)
            continue

        LOGGER.debug("Launching job %d -- %s", i, _record.name)
        self._execute_record(_record, adapter)

    # check the status of the study upon finishing this round of execution
    completion_status = self._check_study_completion()
    return completion_status

generate_scripts()

Generate the scripts for all steps in the ExecutionGraph.

The generate_scripts method scans the ExecutionGraph instance and uses the stored adapter to write executable scripts for either local or scheduled execution. If a restart command is specified, a restart script will be generated for that record.

Source code in maestrowf/datastructures/core/executiongraph.py
def generate_scripts(self):
    """
    Generate the scripts for all steps in the ExecutionGraph.

    The generate_scripts method scans the ExecutionGraph instance and uses
    the stored adapter to write executable scripts for either local or
    scheduled execution. If a restart command is specified, a restart
    script will be generated for that record.
    """
    # An adapter must be specified
    if not self._adapter:
        msg = "Adapter not found. Specify a ScriptAdapter using " \
              "set_adapter."
        LOGGER.error(msg)
        raise ValueError(msg)

    # Set up the adapter.
    LOGGER.info("Generating scripts...")
    adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
    adapter = adapter(**self._adapter)

    self._check_tmp_dir()
    for key, record in self.values.items():
        if key == SOURCE:
            continue

        # Record generates its own script.
        record.setup_workspace()
        record.generate_script(adapter, self._tmp_dir)

log_description()

Log the description of the ExecutionGraph.

Source code in maestrowf/datastructures/core/executiongraph.py
def log_description(self):
    """Log the description of the ExecutionGraph."""
    desc = ["{}: {}".format(key, value)
            for key, value in self._description.items()]
    desc = "\n".join(desc)
    LOGGER.info(
        "\n==================================================\n"
        "%s\n"
        "==================================================\n",
        desc
    )

set_adapter(adapter)

Set the adapter used to interface for scheduling tasks.

Parameters:

Name Type Description Default
adapter

Adapter name to be used when launching the graph.

required
Source code in maestrowf/datastructures/core/executiongraph.py
def set_adapter(self, adapter):
    """
    Set the adapter used to interface for scheduling tasks.

    :param adapter: Adapter name to be used when launching the graph.
    """
    if not adapter:
        # If we have no adapter specified, assume sequential execution.
        self._adapter = None
        return

    if not isinstance(adapter, dict):
        msg = "Adapter settings must be contained in a dictionary."
        LOGGER.error(msg)
        raise TypeError(msg)

    # Check to see that the adapter type is something the
    if adapter["type"] not in ScriptAdapterFactory.get_valid_adapters():
        msg = "'{}' adapter must be specfied in ScriptAdapterFactory." \
              .format(adapter)
        LOGGER.error(msg)
        raise TypeError(msg)

    self._adapter = adapter

write_status(path)

Write the status of the DAG to a CSV file.

Source code in maestrowf/datastructures/core/executiongraph.py
def write_status(self, path):
    """Write the status of the DAG to a CSV file."""
    header = "Step Name,Job ID,Workspace,State,Run Time,Elapsed Time," \
             "Start Time,Submit Time,End Time,Number Restarts,Params"
    status = [header]

    for key in self.status_subtree:
        value = self.values[key]

        jobid_str = "--"
        if value.jobid:
            jobid_str = str(value.jobid[-1])

        # Include step root in workspace when parameterized
        if list(value.params.items()):
            ws = os.path.join(
                * os.path.normpath(
                    value.workspace.value).split(os.sep)[-2:]
            )
        else:
            ws = os.path.split(value.workspace.value)[1]

        _ = [
                value.name, jobid_str,
                ws,
                str(value.status.name), value.run_time, value.elapsed_time,
                value.time_start, value.time_submitted, value.time_end,
                str(value.restarts),
                ";".join(["{}:{}".format(param, value)
                          for param, value in value.params.items()])
            ]
        _ = ",".join(_)
        status.append(_)

    stat_path = os.path.join(path, "status.csv")
    lock_path = os.path.join(path, ".status.lock")
    lock = FileLock(lock_path)
    try:
        with lock.acquire(timeout=10):
            with open(stat_path, "w+") as stat_file:
                stat_file.write("\n".join(status))
    except Timeout:
        pass