Source code for saberx.executers.threaddriver

"""
.. module:: threaddriver
   :synopsis: Module for spawning threads and executing groups.
"""

from saberx.executers.groupexecuter import GroupExecuter
import threading
import os

[docs]class ThreadExecuter: """ **Class for spawning and managing threads for executing groups** """ def __init__(self, **kwargs): """ **Init method to initialise the object created from this class.** """ self.__groups = kwargs.get("groups") self.__lock = threading.Lock() self.__workers = [] self.__logger = kwargs.get("logger") def __aquire_lock(self): """ **Method to aquire lock** This method is called to aquire lock before the threads are spwaned. The lock verifies that the previuos run has ended completely before the next run begins Returns: bool : Successfully aquired lock or not. """ try: # try to aquire the lock only if there is no existing lock if not os.path.exists(self.__lock_file): with open(self.__lock_file, "w") as lock_file: lock_file.write(str(os.getpid())) return True return False except Exception as e: if self.__logger: self.__logger.critical("Unable to aquire lock file : Exception : {}".format(str(e))) return False def __release_lock(self): """ **Method to release lock** This method is used to release a aquired lock. After all the threads have finished their work, this method is called to release the lock. Returns: bool : Successfully released the lock or not """ try: if os.path.exists(self.__lock_file): os.unlink(self.__lock_file) return True except Exception as e: if self.__logger: self.__logger.critical("Unable to release lock file : Exception : {}".format(str(e))) return False def __worker(self, group_id, group, logger): """ **Method to execute a group of actions** This function is the target of the thread spawned. Each thread calls this method and assigns a given group to it. Args: group_id (Integer) : Id of the group group (dict) : Dict representing groups logger (logging object) : logging object Returns: None : Returns nothing """ group_status = GroupExecuter.execute_group(group=group, thread_lock=self.__lock, logger=logger)
[docs] def spawn_workers(self, lock_file): """ ** Method to spawn threads** This method is used to spawn new threads to execute groups. Each thread calls the __worker fuction as target with a given group. Args: lock_file (string) : Path to lock file Returns; bool: Threads spawned and executed successfully or not. """ self.__lock_file = lock_file # First try to aquire lock if self.__aquire_lock(): # Iterate over the groups and spawn a thread for each group for group_index, group in enumerate(self.__groups): worker = threading.Thread(target=self.__worker, args=(group_index, group, self.__logger)) self.__workers.append(worker) worker.start() for worker in self.__workers: worker.join() # release lock when everything is done lock_released = self.__release_lock() if not lock_released: ''' Log lock issue. The lock must be released at this step or else future runs wont take place. Issue must be fixed why lock is not being released. ''' if self.__logger: self.__logger.critical("Lock could not be released. This needs to be fixed for future runs") return False # Run succeeded return True # could not aquire lock, so run wont take place. Hence send false return False