#!/usr/bin/env python3
# coding: utf-8
from __future__ import with_statement, print_function, absolute_import, division
"""
Data Analysis RPC server over Tango:
Tango device server
"""
__author__ = "Jérôme Kieffer"
__contact__ = "Jerome.Kieffer@ESRF.eu"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
__date__ = "09/07/2021"
__status__ = "production"
__docformat__ = 'restructuredtext'
import sys
import os
import json
import threading
import logging
import time
import types
import multiprocessing
import six
if six.PY2:
from Queue import Queue
else:
from queue import Queue
logger = logging.getLogger("dahu.server")
# set loglevel at least at INFO
if logger.getEffectiveLevel() > logging.INFO:
logger.setLevel(logging.INFO)
import PyTango
from .job import Job, plugin_factory
try:
from rfoo.utils import rconsole
rconsole.spawn_server()
except ImportError:
logger.debug("No socket opened for debugging -> please install rfoo")
[docs]class DahuDS(PyTango.LatestDeviceImpl):
"""
Tango device server launcher for Dahu server.
"""
def __init__(self, cl, name):
PyTango.LatestDeviceImpl.__init__(self, cl, name)
self.init_device()
self.job_queue = Queue() # queue containing jobs to process
self.event_queue = Queue() # queue containing finished jobs
self.processing_lock = threading.Semaphore()
self.stat_lock = threading.Semaphore()
self.last_stats = "No statistics collected yet, please use the 'collectStatistics' method first"
self.last_failure = -1
self.last_success = -1
self.statistics_threads = None
self._serialize = False
# self._ncpu_sem = threading.Semaphore(multiprocessing.cpu_count())
# start the two threads related to queues: process_job and event_queue
t2 = threading.Thread(target=self.process_job)
t2.start()
t1 = threading.Thread(target=self.process_event)
t1.start()
[docs] def get_name(self):
"""Returns the name of the class"""
return self.__class__.__name__
[docs] def delete_device(self):
logger.debug("[Device delete_device method] for device %s" % self.get_name())
[docs] def init_device(self):
logger.debug("In %s.init_device()" % self.get_name())
self.set_state(PyTango.DevState.ON)
self.get_device_properties(self.get_device_class())
self.set_change_event("jobSuccess", True, False)
self.set_change_event("jobFailure", True, False)
self.set_change_event("statisticsCollected", True, False)
[docs] def always_executed_hook(self):
pass
[docs] def read_attr_hardware(self, data):
logger.debug("In %s.read_attr_hardware()" % self.get_name())
[docs] def read_jobSuccess(self, attr):
attr.set_value(self.last_success)
[docs] def read_jobFailure(self, attr):
attr.set_value(self.last_failure)
[docs] def read_statisticsCollected(self, attr):
attr.set_value(self.last_stats)
[docs] def read_serialize(self, attr):
attr.set_value(bool(self._serialize))
[docs] def write_serialize(self, attr):
self._serialize = bool(attr.get_write_value)
[docs] def getJobState(self, jobId):
return Job.getStatusFromID(jobId)
[docs] def cleanJob(self, jobId):
return Job.cleanJobFromID(jobId)
[docs] def listPlugins(self):
"""
List all plugin currently loaded .... with a brief description
"""
logger.debug("In %s.listPlugins" % (self.get_name()))
res = ["List of all plugin currently loaded (use initPlugin to loaded additional plugins):"]
plugins = list(plugin_factory.registry.keys())
plugins.sort()
return os.linesep.join(res + [" %s : %s" % (i, plugin_factory.registry[i].__doc__.split("\n")[0]) for i in plugins])
[docs] def initPlugin(self, name):
"""
Creates a job with the given plugin
"""
logger.debug("In %s.initPlugin(%s)" % (self.get_name(), name))
err = None
try:
plugin = plugin_factory(name)
except Exception as error:
err = "plugin %s failed to be instanciated: %s" % (name, error)
logger.error(err)
if plugin is None or err:
return "Plugin not found: %s, err" % (name, err)
else:
return "Plugin loaded: %s%s%s" % (name, os.linesep, plugin.__doc__)
[docs] def abort(self, jobId):
"""
Aborts a job
@param jobId: ID of the job to stop
"""
pass
[docs] def quitDahu(self):
logger.debug("In %s.quitDahu()" % self.get_name())
logger.info("Quitting DahuDS")
sys.exit()
[docs] def startJob(self, argin):
"""
Starts a job
@param argin: 2-list [<Dahu plugin to execute>, <JSON serialized dict>]
@return: jobID which is an int (-1 for error)
"""
logger.debug("In %s.startJob()" % self.get_name())
name, data_input = argin[:2]
if data_input.strip() == "":
return -1
job = Job(name, data_input)
if job is None:
return -1
self.job_queue.put(job)
return job.id
[docs] def process_job(self):
"""
Process all jobs in the queue.
"""
while True:
job = self.job_queue.get()
job.connect_callback(self.finished_processing)
job.start()
if self._serialize:
job.join()
[docs] def finished_processing(self, job):
"""
callback: when processing is done
@param job: instance of dahu.job.Job
"""
logger.debug("In %s.finished_processing id:%s (%s)" % (self.get_name(), job.id, job.status))
# self._ncpu_sem.release()
job.clean(wait=False)
if job.status == job.STATE_SUCCESS:
self.last_success = job.id
else:
sys.stdout.flush()
sys.stderr.flush()
self.last_failure = job.id
self.job_queue.task_done()
self.event_queue.put(job)
[docs] def process_event(self):
"""
process finished jobs on the tango side (issue with tango locks)
"""
while True:
job = self.event_queue.get()
if job.status == job.STATE_SUCCESS:
self.push_change_event("jobSuccess", job.id)
else:
self.push_change_event("jobFailure", job.id)
# TODO one day
# def getRunning(self):
# """
# retrieve the list of plugins currently under execution (with their plugin-Id)
# """
# return EDStatus.getRunning()
#
# def getSuccess(self):
# """
# retrieve the list of plugins finished with success (with their plugin-Id)
# """
# return EDStatus.getSuccess()
#
# def getFailure(self):
# """
# retrieve the list of plugins finished with failure (with their plugin-Id)
# """
# return EDStatus.getFailure()
[docs] def collectStatistics(self):
"""
Retrieve some statistics on all Dahu-Jobs
@return: a page of information about Dahu-jobs
"""
self.statistics_threads = threading.Thread(target=self.statistics)
self.statistics_threads.start()
[docs] def statistics(self):
"""
retrieve some statistics about past jobs.
"""
with self.stat_lock:
fStartStat = time.time()
self.last_stats = Job.stats()
self.last_stats += os.linesep + "Statistics collected on %s, the collect took: %.3fs" % (time.asctime(), time.time() - fStartStat)
self.push_change_event("statisticsCollected", self.last_stats)
[docs] def getStatistics(self):
"""
just return statistics previously calculated
"""
if self.statistics_threads:
self.statistics_threads.join()
return self.last_stats
[docs] def getJobOutput(self, jobId):
"""
Retrieve XML output form a job
@param jobId: name of the job
@return: output from a job
"""
return Job.getDataOutputFromId(jobId, as_JSON=True)
[docs] def getJobError(self, jobId):
"""
Retrieve error message from a job as a string
@param jobId: identifier of the job (int)
@return: Error message
"""
return Job.getErrorFromId(jobId)
[docs] def waitJob(self, jobId):
"""
Wait for a job to be finished and returns the status.
May cause Tango timeout if too slow to finish ....
May do polling to wait the job actually started
@param jobId: identifier of the job (int)
@return: status of the job
"""
res = Job.synchronize_job(jobId)
i = 0
while res == Job.STATE_UNINITIALIZED:
if i > 10:
break
i += 1
time.sleep(0.1)
res = Job.synchronize_job(jobId)
return res
[docs]class DahuDSClass(PyTango.DeviceClass):
# Class Properties
class_property_list = {
}
# Device Properties
device_property_list = {
'plugins_directory':
[PyTango.DevString,
"Dahu plugins directory",
[] ],
}
# Command definitions
cmd_list = {
'startJob': [[PyTango.DevVarStringArray, "[<Dahu plugin to execute>, <JSON serialized dict>]"], [PyTango.DevLong, "job id"]],
'abort': [[PyTango.DevLong, "job id"], [PyTango.DevBoolean, ""]],
'getJobState': [[PyTango.DevLong, "job id"], [PyTango.DevString, "job state"]],
'initPlugin': [[PyTango.DevString, "plugin name"], [PyTango.DevString, "Message"]],
'cleanJob':[[PyTango.DevLong, "job id"], [PyTango.DevString, "Message"]],
'collectStatistics':[[PyTango.DevVoid, "nothing needed"], [PyTango.DevVoid, "Collect some statistics about jobs within Dahu"]],
'getStatistics':[[PyTango.DevVoid, "nothing needed"], [PyTango.DevString, "Retrieve statistics about Dahu-jobs"]],
'getJobOutput': [[PyTango.DevLong, "job id"], [PyTango.DevString, "<JSON serialized dict>"]],
'getJobInput': [[PyTango.DevLong, "job id"], [PyTango.DevString, "<JSON serialized dict>"]],
'getJobError': [[PyTango.DevLong, "job id"], [PyTango.DevString, "Error message"]],
'listPlugins': [[PyTango.DevVoid, "nothing needed"], [PyTango.DevString, "prints the list of all plugin classes currently loaded"]],
'waitJob': [[PyTango.DevLong, "job id"], [PyTango.DevString, "job state"]],
}
# Attribute definitions
attr_list = {
'jobSuccess':
[[PyTango.DevLong,
PyTango.SCALAR,
PyTango.READ]],
'jobFailure':
[[PyTango.DevLong,
PyTango.SCALAR,
PyTango.READ]],
'statisticsCollected':
[[PyTango.DevString,
PyTango.SCALAR,
PyTango.READ]],
'serialize':
[[PyTango.DevBoolean,
PyTango.SCALAR,
PyTango.READ_WRITE]],
}
def __init__(self, name):
PyTango.DeviceClass.__init__(self, name)
self.set_type(name);
logger.debug("In DahuDSClass constructor")