seisflows.system.cluster ======================== .. py:module:: seisflows.system.cluster .. autoapi-nested-parse:: The Cluster class provides the core utilities interaction with HPC systems which must be overloaded by subclasses for specific workload managers, or specific clusters. The `Cluster` class acts as a base class for more specific cluster implementations (like SLURM). However it can be used standalone. When running jobs on the `Cluster` system, jobs will be submitted to the master system using `subprocess.run`, mimicing how jobs would be run on a cluster but not actually submitting to any job scheduler. Classes ------- .. autoapisummary:: seisflows.system.cluster.Cluster Module Contents --------------- .. py:class:: Cluster(title=None, mpiexec='', ntask_max=None, walltime=10, environs='', **kwargs) Bases: :py:obj:`seisflows.system.workstation.Workstation` Cluster System -------------- Generic or common HPC/cluster interfacing commands Parameters ---------- :type title: str :param title: The name used to submit jobs to the system, defaults to the name of the current working directory :type mpiexec: str :param mpiexec: Function used to invoke executables on the system. For example 'mpirun', 'mpiexec', 'srun', 'ibrun' :type ntask_max: int :param ntask_max: limit the number of concurrent tasks in a given array job. Note that if you are directly running the Cluster system on a workstation or the login node of your cluster, try to adhere to the number of cores on your system should be <= `ntask_max` * `nproc`, otherwise you may face memory allocation errors :type walltime: float :param walltime: maximum job time in minutes for the master SeisFlows job submitted to cluster. Fractions of minutes acceptable. :type environs: str :param environs: Optional environment variables to be provided in the following format VAR1=var1,VAR2=var2... Will be set using os.environs Paths ----- *** .. py:attribute:: __doc__ .. py:attribute:: submit_workflow .. py:attribute:: run_functions .. py:attribute:: mpiexec :value: '' .. py:attribute:: ntask_max .. py:attribute:: walltime :value: 10 .. py:attribute:: environs :value: '' .. py:attribute:: _failed_states :value: [] .. py:attribute:: _completed_states :value: [] .. py:attribute:: _pending_states :value: [] .. py:property:: submit_call_header The submit call defines the SBATCH header which is used to submit a workflow task list to the system. It is usually dictated by the system's required parameters, such as account names and partitions. Submit calls are modified and called by the `submit` function. .. note:: Generalized `cluster` returns empty string but child system classes will need to overwrite the submit call. :rtype: str :return: the system-dependent portion of a submit call .. py:property:: run_call_header The run call defines the SBATCH header which is used to run tasks during an executing workflow. Like the submit call its arguments are dictated by the given system. Run calls are modified and called by the `run` function .. note:: Generalized `cluster` returns empty string but child system classes will need to overwrite the submit call. :rtype: str :return: the system-dependent portion of a run call .. py:method:: submit(workdir=None, parameter_file='parameters.yaml', direct=False) Submits the main workflow job as a separate job submitted directly to the system that is running the master job .. note:: print statements are used rather than log statements because the logger will not yet have been instantiated :type workdir: str :param workdir: path to the current working directory :type parameter_file: str :param parameter_file: paramter file file name used to instantiate the SeisFlows package :type direct: bool :param direct: (used for overriding system modules) submits the main workflow job directly to the login node rather than as a separate process on a compute node. Avoids queue times and walltimes but may be discouraged by sys admins as some array processing will take place on the shared login node. .. py:method:: run(funcs, single=False, tasktime=None, _retry=0, **kwargs) Runs tasks multiple times in parallel by submitting NTASK new jobs to system. The list of functions and its kwargs are saved as pickles files, and then re-loaded by each submitted process with specific environment variables. Each spawned process will run the list of functions. .. warning:: Logging parameters `verbose` and `log_level` are passed to each compute job through the pickled kwargs file. This assumes that NONE of the functions that are being passed through also have these variable names as arguments, otherwise there will be conflicting arguments. Probably this won't be the case because these variable names are generally reserved for logging purposes. :type funcs: list of methods :param funcs: a list of functions that should be run in order. All kwargs passed to run() will be passed into the functions. :type single: bool :param single: run a single-process, non-parallel task, such as smoothing the gradient, which only needs to be run by once. This will change how the job array and the number of tasks is defined, such that the job is submitted as a single-core job to the system. :type tasktime: float :param tasktime: Custom tasktime in units minutes for running the given functions `funcs`. If not given, defaults to the System variable `tasktime`. If System `tasktime` is also None, defaults to no tasktime (inifinty time). If tasks exceed the given `tasktime`, the program will exit .. py:method:: _run_task(run_call, task_id) Convenience function to run a single Python job with subprocess.run with some error catching and redirect of stdout to a log file. :type run_call: str :param run_call: python call to run a task involving loading the pickled function list and its kwargs, and then running them :type task_id: int :param task_id: given task id, used for log messages and to format the run call .. py:method:: monitor_job_status(job_id, timeout_s=300, wait_time_s=15) Repeatedly check the status of currently running job(s) in a clusters' queue. If the job goes into a bad state (like 'FAILED'), log the failing job's id and their states. Returns a state of 1 if all jobs complete nominally, returns a state of -1 if any nonzero number of jobs returns a non-complete status. .. note:: This function does nothing in the standalone `Cluster` module, but it is used by child classes, and so defines a usable code they can inherit. Originally this monitoring system was written for the SLURM workload manager, but it is generalized. :type job_id: str or list :param job_id: main job id(s) to query, returned from the subprocess.run that ran the job(s). - if single value, we expect that jobs have been submitted as a job array (e.g., 1_0, 1_1, 1_2) -if a list, we expect that the jobs have been submitted with independent job numbers (e.g, 0, 1, 2) :type timeout_s: float :param timeout_s: length of time [s] to wait for system to respond nominally before rasing a TimeoutError. Sometimes it takes a while for job monitoring to start working. :type wait_time_s: float :param wait_time_s: wait time interval to allow System to initialize jobs in the queue and to prevent the check system from constantly querying the queue system. :rtype: int :return: status of all running jobs. 1 for pass (all jobs COMPLETED). -1 for fail (one or more jobs returned failing status) :raises TimeoutError: if 'sacct' does not return any output for ~1 min. .. py:method:: query_job_states(job_id, timeout_s=300, wait_time_s=30, _recheck=0) :abstractmethod: Queries completion status of an array job. This function is required by child classes and is set here as an empty template. .. note:: MUST be handled by child class for a specific cluster. See `System.Slurm` for one example of how this is handled. :type job_id: str :param job_id: job id to query :type timeout_s: float :param timeout_s: length of time [s] to wait for system to respond nominally before rasing a TimeoutError. Sometimes it takes a while for job monitoring to start working. :type wait_time_s: float :param wait_time_s: initial wait time to allow System to initialize jobs in the queue. I.e., how long do we expect it to take before the job we submitted shows up in the queue. :type _recheck: int :param _recheck: A counter used for recursive calling of the function :rtype: (list, list) :return: (list of job id(s), list of states for each job) :raises TimeoutError: if no output is received from System after