seisflows.system.fujitsu

Fujitsu brand clusters run their own Job Management System (Fujitsu Technical Computing Suite).

Note

The nickname PJM, based on the batch job script directives, may be used as a shorthand to refer to the Fujitsu job management system.

PJM is similar to PBS/TORQUE but has its own unique peculiarities. Systems like UToyko’s Wisteria, and Fugaku (one of the top supercomputers in the world) run using the Fujitsu job scheduler.

Classes

Fujitsu

System Fujitsu

Module Contents

class seisflows.system.fujitsu.Fujitsu(ntask_max=None, pjm_args='', **kwargs)

Bases: seisflows.system.cluster.Cluster

System Fujitsu

Interface for submitting and monitoring jobs on HPC systems running the Fujitsu job management system, a.k.a PJM

Parameters

type pjm_args:

str

param pjm_args:

Any (optional) additional PJM arguments that will be passed to the pjsub scripts. Should be in the form: ‘–key1=value1 –key2=value2”

Paths

***

__doc__
ntask_max
pjm_args = ''
group = None
rscgrp = None
gpu = None
_rscgrps
_completed_states = ['END']
_failed_states = ['CANCEL', 'HOLD', 'ERROR']
_pending_states = ['QUEUED', 'RUNNING']
check()

Checks parameters and paths

property nodes

Defines the number of nodes which is derived from system node size

property node_size

Defines the node size of a given cluster partition. This is a hard set number defined by the system architecture

property submit_call_header

The submit call defines the PJM header which is used to submit a workflow task list to the system. It is usually dictated by the system’s required parameters, such as account names and partitions. Submit calls are modified and called by the submit function.

Return type:

str

Returns:

the system-dependent portion of a submit call

run_call(executable='', tasktime=None)

The run call defines the PJM header which is used to run tasks during an executing workflow. Like the submit call its arguments are dictated by the given system. Run calls are modified and called by the run function

Return type:

str

Returns:

the system-dependent portion of a run call

submit(workdir=None, parameter_file='parameters.yaml', direct=True)

Submit main workflow to the System. Two options are available, submitting a Python job directly to the system, or submitting a subprocess.

Parameters:
  • workdir (str) – path to the current working directory

  • parameter_file (str) – paramter file file name used to instantiate the SeisFlows package

  • direct (bool) – (used for overriding system modules) submits the main workflow job directly to the login node as a Python process (default). If False, submits the main job as a separate subprocess. Note that this is Fujitsu specific and main jobs should be run from interactive jobs run on compute nodes to avoid running jobs on shared login resources

static _stdout_to_job_id(stdout)

The stdout message after a PJSUB job is submitted, from which we get the job number, example std output after submission:

[INFO] PJM 0000 pjsub Job 1334958 submitted.

Parameters:

stdout (str) – standard PJM output that is gathered from subprocess run

Return type:

str

Returns:

a matching job ID. We convert str->int->str to ensure that the job id is an integer value (which it must be)

Raises:

SystemExit – if the job id does not evaluate as an integer

run(funcs, single=False, tasktime=None, **kwargs)

Runs task multiple times in embarrassingly parallel fasion on a PJM cluster. Executes the list of functions (funcs) NTASK times with each task occupying NPROC cores.

Note

Because Wisteria does not have an option similar to Slurm’s ntask_max which limits the amount of concurrently running jobs, AND the system does not allow >8 concurrently submitted jobs (this will result in a PJM submission error), I had to completely rewrite the architecture for the Cluster.run() command.

Parameters:
  • funcs (list of methods) – a list of functions that should be run in order. All kwargs passed to run() will be passed into the functions.

  • single (bool) – run a single-process, non-parallel task, such as smoothing the gradient, which only needs to be run by once. This will change how the job array and the number of tasks is defined, such that the job is submitted as a single-core job to the system.

monitor_job_status(job_id, timeout_s=300, wait_time_s=15, complete_all=False)

Repeatedly check the status of currently running job(s) in a clusters’ queue. If the job goes into a bad state (like ‘FAILED’), log the failing job’s id and their states. Returns a state of 1 if all jobs complete nominally, returns a state of -1 if any nonzero number of jobs returns a non-complete status.

TO DO: probably

Note

This function does nothing in the standalone Cluster module, but it is used by child classes, and so defines a usable code they can inherit. Originally this monitoring system was written for the SLURM workload manager, but it is generalized.

Parameters:
  • job_id (str or list) – main job id(s) to query, returned from the subprocess.run that ran the job(s). - if single value, we expect that jobs have been submitted as a job array (e.g., 1_0, 1_1, 1_2) -if a list, we expect that the jobs have been submitted with independent job numbers (e.g, 0, 1, 2)

  • timeout_s (float) – length of time [s] to wait for system to respond nominally before rasing a TimeoutError. Sometimes it takes a while for job monitoring to start working.

  • wait_time_s (float) – wait time interval to allow System to initialize jobs in the queue and to prevent the check system from constantly querying the queue system.

Return type:

int

Returns:

status of all running jobs. 1 for pass (all jobs COMPLETED). -1 for fail (one or more jobs returned failing status)

Raises:

TimeoutError – if ‘sacct’ does not return any output for ~1 min.

query_job_states(job_id, timeout_s=300, wait_time_s=30, _recheck=0)

Overwrites system.cluster.Cluster.query_job_states

Queries completion status of array job by running the PJM cmd pjstat Because pjstat treats running and finished jobs separately, we need to query both the current running processes, and the history

Note

The actual command line call wil look something like this

$ pjstat Wisteria/BDEC-01 scheduled stop time: 2023/03/31(Fri) 09:00:00 (Remain: 28days 21:34:40)

JOB_ID JOB_NAME STATUS PROJECT RSCGROUP START_DATE ELAPSE TOKEN NODEGPU 1334861 STDIN END gr58interactive-o_n1 03/02 11:16:33 00:02:14 - 1 -

Note

pjstat flag options are described as follows: -H: get the history (non-running jobs)

Parameters:
  • job_id (str) – main job id to query, returned from the subprocess.run that ran the jobs

  • timeout_s (float) – length of time [s] to wait for system to respond nominally before rasing a TimeoutError. Sometimes it takes a while for job monitoring to start working.

  • wait_time_s (float) – initial wait time to allow System to initialize jobs in the queue. I.e., how long do we expect it to take before the job we submitted shows up in the queue.

  • _recheck (int) – Used for recursive calling of the function. It can take time for jobs to be initiated on a system, which may result in the stdout of the ‘sacct’ command to be empty. In this case we wait and call the function again. Rechecks are used to prevent endless loops by putting a stop criteria

Raises:

TimeoutError – if ‘pjstat’ does not return any output for ~1 min.