seisflows.system.cluster
The Cluster class provides the core utilities interaction with HPC systems which must be overloaded by subclasses for specific workload managers, or specific clusters.
The Cluster class acts as a base class for more specific cluster implementations (like SLURM). However it can be used standalone. When running jobs on the Cluster system, jobs will be submitted to the master system using subprocess.run, mimicing how jobs would be run on a cluster but not actually submitting to any job scheduler.
Classes
Cluster System |
Module Contents
- class seisflows.system.cluster.Cluster(title=None, mpiexec='', ntask_max=None, walltime=10, environs='', **kwargs)
Bases:
seisflows.system.workstation.WorkstationCluster System
Generic or common HPC/cluster interfacing commands
Parameters
- type title:
str
- param title:
The name used to submit jobs to the system, defaults to the name of the current working directory
- type mpiexec:
str
- param mpiexec:
Function used to invoke executables on the system. For example ‘mpirun’, ‘mpiexec’, ‘srun’, ‘ibrun’
- type ntask_max:
int
- param ntask_max:
limit the number of concurrent tasks in a given array job. Note that if you are directly running the Cluster system on a workstation or the login node of your cluster, try to adhere to the number of cores on your system should be <= ntask_max * nproc, otherwise you may face memory allocation errors
- type walltime:
float
- param walltime:
maximum job time in minutes for the master SeisFlows job submitted to cluster. Fractions of minutes acceptable.
- type environs:
str
- param environs:
Optional environment variables to be provided in the following format VAR1=var1,VAR2=var2… Will be set using os.environs
Paths
- __doc__
- submit_workflow
- run_functions
- mpiexec = ''
- ntask_max
- walltime = 10
- environs = ''
- _failed_states = []
- _completed_states = []
- _pending_states = []
- property submit_call_header
The submit call defines the SBATCH header which is used to submit a workflow task list to the system. It is usually dictated by the system’s required parameters, such as account names and partitions. Submit calls are modified and called by the submit function.
Note
Generalized cluster returns empty string but child system classes will need to overwrite the submit call.
- Return type:
str
- Returns:
the system-dependent portion of a submit call
- property run_call_header
The run call defines the SBATCH header which is used to run tasks during an executing workflow. Like the submit call its arguments are dictated by the given system. Run calls are modified and called by the run function
Note
Generalized cluster returns empty string but child system classes will need to overwrite the submit call.
- Return type:
str
- Returns:
the system-dependent portion of a run call
- submit(workdir=None, parameter_file='parameters.yaml', direct=False)
Submits the main workflow job as a separate job submitted directly to the system that is running the master job
Note
print statements are used rather than log statements because the logger will not yet have been instantiated
- Parameters:
workdir (str) – path to the current working directory
parameter_file (str) – paramter file file name used to instantiate the SeisFlows package
direct (bool) – (used for overriding system modules) submits the main workflow job directly to the login node rather than as a separate process on a compute node. Avoids queue times and walltimes but may be discouraged by sys admins as some array processing will take place on the shared login node.
- run(funcs, single=False, tasktime=None, _retry=0, **kwargs)
Runs tasks multiple times in parallel by submitting NTASK new jobs to system. The list of functions and its kwargs are saved as pickles files, and then re-loaded by each submitted process with specific environment variables. Each spawned process will run the list of functions.
Warning
Logging parameters verbose and log_level are passed to each compute job through the pickled kwargs file. This assumes that NONE of the functions that are being passed through also have these variable names as arguments, otherwise there will be conflicting arguments. Probably this won’t be the case because these variable names are generally reserved for logging purposes.
- Parameters:
funcs (list of methods) – a list of functions that should be run in order. All kwargs passed to run() will be passed into the functions.
single (bool) – run a single-process, non-parallel task, such as smoothing the gradient, which only needs to be run by once. This will change how the job array and the number of tasks is defined, such that the job is submitted as a single-core job to the system.
tasktime (float) – Custom tasktime in units minutes for running the given functions funcs. If not given, defaults to the System variable tasktime. If System tasktime is also None, defaults to no tasktime (inifinty time). If tasks exceed the given tasktime, the program will exit
- _run_task(run_call, task_id)
Convenience function to run a single Python job with subprocess.run with some error catching and redirect of stdout to a log file.
- Parameters:
run_call (str) – python call to run a task involving loading the pickled function list and its kwargs, and then running them
task_id (int) – given task id, used for log messages and to format the run call
- monitor_job_status(job_id, timeout_s=300, wait_time_s=15)
Repeatedly check the status of currently running job(s) in a clusters’ queue. If the job goes into a bad state (like ‘FAILED’), log the failing job’s id and their states. Returns a state of 1 if all jobs complete nominally, returns a state of -1 if any nonzero number of jobs returns a non-complete status.
Note
This function does nothing in the standalone Cluster module, but it is used by child classes, and so defines a usable code they can inherit. Originally this monitoring system was written for the SLURM workload manager, but it is generalized.
- Parameters:
job_id (str or list) – main job id(s) to query, returned from the subprocess.run that ran the job(s). - if single value, we expect that jobs have been submitted as a job array (e.g., 1_0, 1_1, 1_2) -if a list, we expect that the jobs have been submitted with independent job numbers (e.g, 0, 1, 2)
timeout_s (float) – length of time [s] to wait for system to respond nominally before rasing a TimeoutError. Sometimes it takes a while for job monitoring to start working.
wait_time_s (float) – wait time interval to allow System to initialize jobs in the queue and to prevent the check system from constantly querying the queue system.
- Return type:
int
- Returns:
status of all running jobs. 1 for pass (all jobs COMPLETED). -1 for fail (one or more jobs returned failing status)
- Raises:
TimeoutError – if ‘sacct’ does not return any output for ~1 min.
- abstractmethod query_job_states(job_id, timeout_s=300, wait_time_s=30, _recheck=0)
Queries completion status of an array job. This function is required by child classes and is set here as an empty template.
Note
MUST be handled by child class for a specific cluster. See System.Slurm for one example of how this is handled.
- Parameters:
job_id (str) – job id to query
timeout_s (float) – length of time [s] to wait for system to respond nominally before rasing a TimeoutError. Sometimes it takes a while for job monitoring to start working.
wait_time_s (float) – initial wait time to allow System to initialize jobs in the queue. I.e., how long do we expect it to take before the job we submitted shows up in the queue.
_recheck (int) – A counter used for recursive calling of the function
- Return type:
(list, list)
- Returns:
(list of job id(s), list of states for each job)
- Raises:
TimeoutError – if no output is received from System after