Source code for soma_workflow.schedulers.mpi_scheduler

# -*- coding: utf-8 -*-

from __future__ import print_function
from __future__ import absolute_import
from .. import scheduler
import threading
import logging
import socket
import six
from mpi4py import MPI
from .. import constants

[docs]class MPIScheduler(scheduler.Scheduler): ''' Allow to submit, kill and get the status of jobs. ''' parallel_job_submission_info = None logger = None is_sleeping = None _proc_nb = None _queue = None _jobs = None _processes = None _status = None _exit_info = None _loop = None _interval = None _lock = None _failed_count = None JOB_REQUEST = 11 JOB_SENDING = 12 EXIT_SIGNAL = 13 JOB_KILL = 14 JOB_RESULT = 15 NO_JOB = 16 def __init__(self, communicator, interval=0.01, nb_attempt_per_job=1): super(MPIScheduler, self).__init__() self._communicator = communicator self.parallel_job_submission_info = None # self._proc_nb = proc_nb self._queue = [] self._jobs = {} self._fail_count = {} # job_id -> nb of fail # self._processes = {} self._status = {} self._exit_info = {} self._lock = threading.RLock() self.stop_thread_loop = False self._interval = interval self._nb_attempt_per_job = nb_attempt_per_job self._logger = logging.getLogger("testMPI") def master_loop(self): self._stopped_slaves = 0 while not self.stop_thread_loop: self._master_iteration() # time.sleep(0)#self._interval) self._loop = threading.Thread(name="scheduler_loop", target=master_loop, args=[self]) self._loop.daemon = True self._loop.start() def end_scheduler_thread(self): with self._lock: self.stop_thread_loop = True self._loop.join() print("[host: " + socket.gethostname() + "] " + "Soma scheduler thread ended nicely.") def _master_iteration(self): MPIStatus = MPI.Status() # if not self._queue: # return self._communicator.Probe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=MPIStatus) with self._lock: t = MPIStatus.Get_tag() if t == MPIScheduler.JOB_REQUEST: # self._logger.debug("Master received the JOB_REQUEST signal") s = MPIStatus.Get_source() if not self._queue: # self._logger.debug("Master No job for now") self._communicator.recv(source=s, tag=MPIScheduler.JOB_REQUEST) self._communicator.send("No job for now", dest=s, tag=MPIScheduler.NO_JOB) else: self._logger.debug("[host: " + socket.gethostname() + "] " + "Master send a Job !!!") self._communicator.recv( source=s, tag=MPIScheduler.JOB_REQUEST) job_id = self._queue.pop(0) job_list = [self._jobs[job_id]] self._communicator.send(job_list, dest=s, tag=MPIScheduler.JOB_SENDING) for j in job_list: self._status[j.job_id] = constants.RUNNING elif t == MPIScheduler.JOB_RESULT: # self._logger.debug("Master received the JOB_RESULT signal") s = MPIStatus.Get_source() ended_jobs_info = self._communicator.recv( source=s, tag=MPIScheduler.JOB_RESULT) for job_id, end_info in six.iteritems(ended_jobs_info): job_status, exit_info = end_info ret_value = exit_info[1] try_new_attempt = False if ret_value != 0 and \ (job_id not in self._fail_count or self._fail_count[job_id] < self._nb_attempt_per_job): if job_id in self._fail_count: self._fail_count[job_id] += 1 else: self._fail_count[job_id] = 1 if self._fail_count[job_id] < self._nb_attempt_per_job: try_new_attempt = True self._logger.debug( "[host: " + socket.gethostname() + "] " + repr(self._fail_count[job_id]) + " fails for job " + repr(job_id) + " (ret value " + repr(ret_value) + ")") if try_new_attempt: self._queue.insert(0, job_id) else: self._exit_info[job_id] = exit_info self._status[job_id] = job_status job = self._jobs[job_id] if job.signal_end: # trigger the event to the engine self.jobs_finished_event.set() elif t == MPIScheduler.EXIT_SIGNAL: # self._logger.debug("Master received the EXIT_SIGNAL") self._stopped_slaves = self._stopped_slaves + 1 if self._stopped_slaves == self._communicator.size - 1: self.stop_thread_loop = True else: self._logger.critical("[host: " + socket.gethostname() + "] " + "Master unknown tag") def sleep(self): self.is_sleeping = True def wake(self): self.is_sleeping = False def clean(self): pass def queued_job_count(self): return len(self._queue)
[docs] def job_submission(self, jobs): ''' * job *EngineJob* * return: *string* Job id for the scheduling system (DRMAA for example) ''' print('MPI job_submission:', len(jobs)) for job in jobs: if not job.job_id or job.job_id == -1: raise Exception("Invalid job: no id") # self._logger.debug(">> job_submission wait lock") drmaa_ids = [] with self._lock: # self._logger.debug(">> job_submission wait lock END") for job in jobs: self._queue.append(job.job_id) self._jobs[job.job_id] = job self._status[job.job_id] = constants.QUEUED_ACTIVE drmaa_ids.append(job.job_id) self._queue.sort(key=lambda job_id: self._jobs[job_id].priority, reverse=True) self._logger.debug("[host: " + socket.gethostname() + "] " + "%d Jobs were submitted." % len(jobs)) return drmaa_ids
[docs] def get_job_status(self, scheduler_job_id): ''' * scheduler_job_id *string* Job id for the scheduling system (DRMAA for example) * return: *string* Job status as defined in constants.JOB_STATUS ''' if not scheduler_job_id in self._status: raise Exception("Unknown job.") status = self._status[scheduler_job_id] return status
[docs] def get_job_exit_info(self, scheduler_job_id): ''' * scheduler_job_id *string* Job id for the scheduling system (DRMAA for example) * return: *tuple* exit_status, exit_value, term_sig, resource_usage ''' with self._lock: exit_info = self._exit_info[scheduler_job_id] del self._exit_info[scheduler_job_id] del self._jobs[scheduler_job_id] del self._status[scheduler_job_id] return exit_info
[docs] def kill_job(self, scheduler_job_id): ''' * scheduler_job_id *string* Job id for the scheduling system (DRMAA for example) ''' # TODO pass
[docs] @classmethod def build_scheduler(cls, config): sch = MPIScheduler(MPI.COMM_WORLD) return sch