seisflows.system.fujitsu ======================== .. py:module:: seisflows.system.fujitsu .. autoapi-nested-parse:: Fujitsu brand clusters run their own Job Management System (Fujitsu Technical Computing Suite). .. note:: The nickname `PJM`, based on the batch job script directives, may be used as a shorthand to refer to the Fujitsu job management system. PJM is similar to PBS/TORQUE but has its own unique peculiarities. Systems like UToyko's Wisteria, and Fugaku (one of the top supercomputers in the world) run using the Fujitsu job scheduler. Classes ------- .. autoapisummary:: seisflows.system.fujitsu.Fujitsu Module Contents --------------- .. py:class:: Fujitsu(ntask_max=None, pjm_args='', **kwargs) Bases: :py:obj:`seisflows.system.cluster.Cluster` System Fujitsu -------------- Interface for submitting and monitoring jobs on HPC systems running the Fujitsu job management system, a.k.a PJM Parameters ---------- :type pjm_args: str :param pjm_args: Any (optional) additional PJM arguments that will be passed to the pjsub scripts. Should be in the form: '--key1=value1 --key2=value2" Paths ----- *** .. py:attribute:: __doc__ .. py:attribute:: ntask_max .. py:attribute:: pjm_args :value: '' .. py:attribute:: group :value: None .. py:attribute:: rscgrp :value: None .. py:attribute:: gpu :value: None .. py:attribute:: _rscgrps .. py:attribute:: _completed_states :value: ['END'] .. py:attribute:: _failed_states :value: ['CANCEL', 'HOLD', 'ERROR'] .. py:attribute:: _pending_states :value: ['QUEUED', 'RUNNING'] .. py:method:: check() Checks parameters and paths .. py:property:: nodes Defines the number of nodes which is derived from system node size .. py:property:: node_size Defines the node size of a given cluster partition. This is a hard set number defined by the system architecture .. py:property:: submit_call_header The submit call defines the PJM header which is used to submit a workflow task list to the system. It is usually dictated by the system's required parameters, such as account names and partitions. Submit calls are modified and called by the `submit` function. :rtype: str :return: the system-dependent portion of a submit call .. py:method:: run_call(executable='', tasktime=None) The run call defines the PJM header which is used to run tasks during an executing workflow. Like the submit call its arguments are dictated by the given system. Run calls are modified and called by the `run` function :rtype: str :return: the system-dependent portion of a run call .. py:method:: submit(workdir=None, parameter_file='parameters.yaml', direct=True) Submit main workflow to the System. Two options are available, submitting a Python job directly to the system, or submitting a subprocess. :type workdir: str :param workdir: path to the current working directory :type parameter_file: str :param parameter_file: paramter file file name used to instantiate the SeisFlows package :type direct: bool :param direct: (used for overriding system modules) submits the main workflow job directly to the login node as a Python process (default). If False, submits the main job as a separate subprocess. Note that this is Fujitsu specific and main jobs should be run from interactive jobs run on compute nodes to avoid running jobs on shared login resources .. py:method:: _stdout_to_job_id(stdout) :staticmethod: The stdout message after a PJSUB job is submitted, from which we get the job number, example std output after submission: [INFO] PJM 0000 pjsub Job 1334958 submitted. :type stdout: str :param stdout: standard PJM output that is gathered from subprocess run :rtype: str :return: a matching job ID. We convert str->int->str to ensure that the job id is an integer value (which it must be) :raises SystemExit: if the job id does not evaluate as an integer .. py:method:: run(funcs, single=False, tasktime=None, **kwargs) Runs task multiple times in embarrassingly parallel fasion on a PJM cluster. Executes the list of functions (`funcs`) NTASK times with each task occupying NPROC cores. .. note:: Because Wisteria does not have an option similar to Slurm's `ntask_max` which limits the amount of concurrently running jobs, AND the system does not allow >8 concurrently submitted jobs (this will result in a PJM submission error), I had to completely rewrite the architecture for the `Cluster.run()` command. :type funcs: list of methods :param funcs: a list of functions that should be run in order. All kwargs passed to run() will be passed into the functions. :type single: bool :param single: run a single-process, non-parallel task, such as smoothing the gradient, which only needs to be run by once. This will change how the job array and the number of tasks is defined, such that the job is submitted as a single-core job to the system. .. py:method:: monitor_job_status(job_id, timeout_s=300, wait_time_s=15, complete_all=False) Repeatedly check the status of currently running job(s) in a clusters' queue. If the job goes into a bad state (like 'FAILED'), log the failing job's id and their states. Returns a state of 1 if all jobs complete nominally, returns a state of -1 if any nonzero number of jobs returns a non-complete status. TO DO: probably .. note:: This function does nothing in the standalone `Cluster` module, but it is used by child classes, and so defines a usable code they can inherit. Originally this monitoring system was written for the SLURM workload manager, but it is generalized. :type job_id: str or list :param job_id: main job id(s) to query, returned from the subprocess.run that ran the job(s). - if single value, we expect that jobs have been submitted as a job array (e.g., 1_0, 1_1, 1_2) -if a list, we expect that the jobs have been submitted with independent job numbers (e.g, 0, 1, 2) :type timeout_s: float :param timeout_s: length of time [s] to wait for system to respond nominally before rasing a TimeoutError. Sometimes it takes a while for job monitoring to start working. :type wait_time_s: float :param wait_time_s: wait time interval to allow System to initialize jobs in the queue and to prevent the check system from constantly querying the queue system. :rtype: int :return: status of all running jobs. 1 for pass (all jobs COMPLETED). -1 for fail (one or more jobs returned failing status) :raises TimeoutError: if 'sacct' does not return any output for ~1 min. .. py:method:: query_job_states(job_id, timeout_s=300, wait_time_s=30, _recheck=0) Overwrites `system.cluster.Cluster.query_job_states` Queries completion status of array job by running the PJM cmd `pjstat` Because `pjstat` treats running and finished jobs separately, we need to query both the current running processes, and the history .. note:: The actual command line call wil look something like this $ pjstat Wisteria/BDEC-01 scheduled stop time: 2023/03/31(Fri) 09:00:00 (Remain: 28days 21:34:40) JOB_ID JOB_NAME STATUS PROJECT RSCGROUP START_DATE ELAPSE TOKEN NODEGPU 1334861 STDIN END gr58interactive-o_n1 03/02 11:16:33 00:02:14 - 1 - .. note:: `pjstat` flag options are described as follows: -H: get the history (non-running jobs) :type job_id: str :param job_id: main job id to query, returned from the subprocess.run that ran the jobs :type timeout_s: float :param timeout_s: length of time [s] to wait for system to respond nominally before rasing a TimeoutError. Sometimes it takes a while for job monitoring to start working. :type wait_time_s: float :param wait_time_s: initial wait time to allow System to initialize jobs in the queue. I.e., how long do we expect it to take before the job we submitted shows up in the queue. :type _recheck: int :param _recheck: Used for recursive calling of the function. It can take time for jobs to be initiated on a system, which may result in the stdout of the 'sacct' command to be empty. In this case we wait and call the function again. Rechecks are used to prevent endless loops by putting a stop criteria :raises TimeoutError: if 'pjstat' does not return any output for ~1 min.