Source code for rsopt.libe_tools.executors

import os
import logging

from libensemble.executors.mpi_executor import MPIExecutor
from libensemble.executors.executor import Executor, Task, ExecutorException
from libensemble.resources.resources import Resources
from pykern import pkyaml
from rsopt import EXECUTOR_SCHEMA

EXECUTOR_SCHEMA = pkyaml.load_file(EXECUTOR_SCHEMA)
logger = logging.getLogger(__name__)
# To change logging level for just this module
# logger.setLevel(logging.DEBUG)

_RSMPI_CONFIG_PATH = EXECUTOR_SCHEMA['rsmpi']['config_path']


[docs]def register_rsmpi_executor(hosts: str or list = 'auto') -> MPIExecutor: """ Create an MPIExecutor that can use rsmpi. The executor is returned and may be used to register calculations for workers like any other libEnsemble executor. :param hosts: (str or list) If 'auto' then all rsmpi resources are detected and used. If a list is given the list entries are written to the libe_nodes file. :return: libensemble.executors.mpi_executor.MPIExecutor object """ schema = EXECUTOR_SCHEMA['rsmpi'] if type(hosts) == list: pass elif hosts == 'auto': hosts = _detect_rsmpi_resources() else: raise TypeError('hosts must be str or int') _generate_rsmpi_node_file(hosts) customizer = {k: schema[k] for k in ('mpi_runner', 'runner_name', 'subgroup_launch')} jobctrl = MPIExecutor(custom_info=customizer) # Set longer fail time - rsmpi is relatively slow to start jobctrl.fail_time = schema['fail_time'] return jobctrl
def _detect_rsmpi_resources() -> int: hosts = 0 assert os.path.isfile(_RSMPI_CONFIG_PATH), "rsmpi configuration does not exist" with open(_RSMPI_CONFIG_PATH, 'r') as ff: for line in ff.readlines(): if 'Host' in line: hosts += 1 assert hosts > 0, 'No hosts listed in rsmpi configuration' return hosts def _generate_rsmpi_node_file(nodes: int or list) -> None: with open('libe_nodes', 'w') as ff: if type(nodes) == int: for node in range(1, nodes+1): ff.write(str(node)+'\n') elif type(nodes) == list: for node in nodes: ff.write(str(node)+'\n') else: raise TypeError(f'nodes must be int or list. Received instead: {nodes}') # Not strictly needed (MPIExecutor with n=1 is currently used by rsopt to simplify setup)
[docs]class SerialExecutor(Executor): def __init__(self): super().__init__() self._launch_with_retries = MPIExecutor._launch_with_retries self.max_launch_attempts = 5 self.fail_time = 2 self.retry_delay_incr = 5
[docs] def add_comm_info(self, libE_nodes, serial_setup): """Adds comm-specific information to executor. Updates resources information if auto_resources is true. """ self.resources.add_comm_info(libE_nodes=libE_nodes)
# if serial_setup: # self._serial_setup()
[docs] def set_worker_info(self, comm, workerid=None): """Sets info for this executor""" self.workerID = workerid if not self.resources: self.resources = Resources() self.resources.set_worker_resources(self.workerID, comm)
[docs] def submit(self, calc_type=None, app_name=None, app_args=None, stdout=None, stderr=None, dry_run=False, wait_on_start=False): if app_name is not None: app = self.get_app(app_name) elif calc_type is not None: app = self.default_app(calc_type) else: raise ExecutorException("Either app_name or calc_type must be set") default_workdir = os.getcwd() task = Task(app, app_args, default_workdir, stdout, stderr, self.workerID) runline = [] runline.extend(task.app.full_path.split()) if task.app_args is not None: runline.extend(task.app_args.split()) task.runline = ' '.join(runline) # Allow to be queried if dry_run: task.dry_run = True logger.info('Test (No submit) Runline: {}'.format(' '.join(runline))) task._set_complete(dry_run=True) else: # Launch Task self._launch_with_retries(task, runline, subgroup_launch=False, wait_on_start=wait_on_start) if not task.timer.timing: task.timer.start() task.submit_time = task.timer.tstart # Time not date - may not need if using timer. self.list_of_tasks.append(task) return task