Source code for dahu.plugin

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#

"""
Data Analysis RPC server over Tango: 

Definiton of plugins
"""

__authors__ = ["Jérôme Kieffer"]
__contact__ = "Jerome.Kieffer@ESRF.eu"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
__date__ = "01/09/2020"
__status__ = "production"

import os
import logging
import cProfile
import time
from .factory import plugin_factory
from .utils import get_workdir

logger = logging.getLogger("dahu.plugin")


[docs]class Plugin(object): """ A plugin is instanciated * Gets its input parameters as a dictionary from the setup method * Performs some work in the process * Sets the result as output attribute, should be a dictionary * The process can be an infinite loop or a server which can be aborted using the abort method """ DEFAULT_SET_UP = "setup" # name of the method used to set-up the plugin (close connection, files) DEFAULT_PROCESS = "process" # specify how to run the default processing DEFAULT_TEAR_DOWN = "teardown" # name of the method used to tear-down the plugin (close connection, files) DEFAULT_ABORT = "abort" # name of the method used to abort the plugin (if any. Tear_Down will be called) REPROCESS_IGNORE = [] #list of keys from input to be ignored when reprocessing data def __init__(self): """ We assume an empty constructor """ self.input = {} self.output = {} self._logging = [] # stores the logging information to send back self.is_aborted = False self.__profiler = None
[docs] def get_name(self): return self.__class__.__name__
[docs] def setup(self, kwargs=None): """ This is the second constructor to setup input variables and possibly initialize some objects """ if kwargs is not None: self.input.update(kwargs) if self.input.get("do_profiling"): self.__profiler = cProfile.Profile() self.__profiler.enable()
[docs] def process(self): """ main processing of the plugin """ pass
[docs] def teardown(self): """ method used to tear-down the plugin (close connection, files) This is always run, even if process fails """ self.output["logging"] = self._logging if self.input.get("do_profiling"): self.__profiler.disable() name = "%05i_%s.%s.profile" % (self.input.get("job_id", 0), self.__class__.__module__, self.__class__.__name__) profile_file = os.path.join(get_workdir(), name) self.log_error("Profiling information in %s" % profile_file, do_raise=False) self.__profiler.dump_stats(profile_file)
[docs] def get_info(self): """ """ return os.linesep.join(self._logging)
[docs] def abort(self): """ Method called to stop a server process """ self.is_aborted = True
[docs] def log_error(self, txt, do_raise=True): """ Way to log errors and raise error """ if do_raise: err = "ERROR in %s: %s" % (self.get_name(), txt) logger.error(err) else: err = "Warning in %s: %s" % (self.get_name(), txt) logger.warning(err) self._logging.append(err) if do_raise: raise RuntimeError(err)
[docs] def log_warning(self, txt): """ Way to log warning """ err = "Warning in %s: %s" % (self.get_name(), txt) logger.warning(err) self._logging.append(err)
[docs] def wait_for(self, job_id): """Wait for another job to be finished ... :param job_id: identifier for the job :return: the job object """ from .job import Job assert isinstance(job_id, int) TIMEOUT = getattr(self, 'TIMEOUT', 10.0) #default timeout to 10s if job_id>Job._id_class: self.log_warning("Not synchronizing job, invalid id: %s" % job_id) else: status = Job.synchronize_job(job_id, TIMEOUT) abort_time = time.time() + TIMEOUT while status == Job.STATE_UNINITIALIZED: # Wait for job to start time.sleep(0.1) status = Job.synchronize_job(job_id, TIMEOUT) if time.time() > abort_time: self.log_error("Timeout while waiting other job to finish") break if status != Job.STATE_SUCCESS: self.log_error("Other job ended in %s: aborting myself" % status) return Job.getJobFromId(job_id)
[docs]class PluginFromFunction(Plugin): """ Template class to build a plugin from a function """ def __init__(self): """ :param funct: function to be wrapped """ Plugin.__init__(self) def __call__(self, **kwargs): """ Behaves like a normal function: for debugging """ self.input.update(kwargs) self.process() self.teardown() return self.output["result"]
[docs] def process(self): if self.input is None: logger.warning("PluginFromFunction.process: self.input is None !!! %s", self.input) else: funct_input = self.input.copy() if "job_id" in funct_input: funct_input.pop("job_id") if "plugin_name" in funct_input: funct_input.pop("plugin_name") self.output["result"] = self.function(**funct_input)
[docs]def plugin_from_function(function): """ Create a plugin class from a given function and registers it into the :param function: any function :return: plugin name to be used by the plugin_factory to get an instance """ logger.debug("creating plugin from function %s" % function.__name__) class_name = function.__module__ + "." + function.__name__ klass = type(class_name, (PluginFromFunction,), {'function': staticmethod(function), "__doc__": function.__doc__}) plugin_factory.register(klass, class_name) return class_name