Dahu Package
You can have a look at the man pages of the CLi tool: dahu-reprocess and dahu_server which is the Tang-device server.
This is the public API of dahu.
dahu.job
Data Analysis RPC server over Tango:
Contains the Job class which handles jobs. A static part of the class contains statistics of the class
- class Job(name='plugin.Plugin', input_data={})[source]
Bases:
ThreadClass Job
Each instance will be a job
Constructor takes an input data and generates the JobId
Each instance will gave a “getOutput” method with optional join
there could be a “join” method, waiting for the job to finish
Each instance will have a “execute” method and returning a JobId
Each instance will have a “setCallBack” method that stores the name of the external callback
provide status of a job
Each instance has an abort method which can be used to stop processing (or a server)
Static part: * keeps track of all jobs status * leave the time to job to initialize * static class retrieve job-instance, status, small-log … * does not manage workload of the computer, should be managed at the ExecPlugin level
Used for the tango binding
== class variables == dictPluginStatus[pluginName] = [“uninitialized”|”running”|”executed”|”failed”] dictJobs [JobId] = Job.Instance
== static methods == getJob(JobId)
RESERVED keywords from Thread: start, run, join, name, ident, is_alive, daemon
start is overridden with a call to the factory to instanciate the plugin
- STATE_INEXISTANT = 'inexistant'
- STATE_UNINITIALIZED = 'uninitialized'
- STATE_STARTING = 'starting'
- STATE_RUNNING = 'running'
- STATE_SUCCESS = 'success'
- STATE_FAILURE = 'failure'
- STATE_ABORTED = 'aborted'
- STATE = ['uninitialized', 'starting', 'running', 'success', 'failure', 'aborted']
- join(timeout=None)[source]
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating-point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
- run()[source]
Defines the sequence of execution of the plugin 1) the the state to “running” 2) sets the input data to the plugin 3) run the set-up 4) run the process 4) run the tear-down: always runs tear-down ! 5) run the call-backs
- clean(force=False, wait=True)[source]
Frees the memory associated with the plugin
- Parameters:
force – Force garbage collection after clean-up
wait – wait for job to be finished
- synchronize(timeout=None)
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating-point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
- property id
- Returns:
JobId
@rtype: integer
- property plugin
- Returns:
the processing instance
@rtype: python object
- property status
- Returns:
status of the Job
@rtype: string
- property input_data
Returns the job input data
- property output_data
Returns the job output data :param _bWait: shall we wait for the plugin to finish to retrieve output data: Yes by default. :type _bWait: boolean
- getName()[source]
Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
- setName(name)[source]
Set the name string for this thread.
This method is deprecated, use the name attribute instead.
- property name
A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.
- classmethod synchronize_job(jobId, timeout=None)[source]
Wait for all a specific jobs to finish.
- Parameters:
jobId – identifier of the job … intg
timeout – timeout in second to wait
- Returns:
status of the job
- classmethod getStatusFromID(jobId)[source]
Retrieve the job (hence the plugin) status
- Parameters:
jobId (int) – the Job identification number
- Returns:
the Job status
@rtype: string
- classmethod getStatusFromId(jobId)
Retrieve the job (hence the plugin) status
- Parameters:
jobId (int) – the Job identification number
- Returns:
the Job status
@rtype: string
- classmethod getJobFromID(jobId)[source]
Retrieve the job (hence the plugin)
- Parameters:
jobId – the Job identification number
- Returns:
the “Job instance”, which contains the plugin and the status
@rtype: a Python object, instance of Job.
- classmethod getJobFromId(jobId)
Retrieve the job (hence the plugin)
- Parameters:
jobId – the Job identification number
- Returns:
the “Job instance”, which contains the plugin and the status
@rtype: a Python object, instance of Job.
- classmethod cleanJobfromId(jobId, forceGC=True)[source]
Frees the memory associated with the top level plugin
- Parameters:
jobId (int) – the Job identification number
forceGC (boolean) – Force garbage collection after clean-up
- classmethod cleanJobfromID(jobId, forceGC=True)
Frees the memory associated with the top level plugin
- Parameters:
jobId (int) – the Job identification number
forceGC (boolean) – Force garbage collection after clean-up
- classmethod getDataOutputFromId(jobId, as_JSON=False)[source]
Returns the Plugin Output Data :param jobId: job idenfier :type jobId: int :return: Job.DataOutput JSON string
- classmethod getDataOutputFromID(jobId, as_JSON=False)
Returns the Plugin Output Data :param jobId: job idenfier :type jobId: int :return: Job.DataOutput JSON string
- classmethod getDataInputFromId(jobId, as_JSON=False)[source]
Returns the Plugin Input Data :param jobId: job idenfier :type jobId: int :return: Job.DataInput JSON string
- classmethod getDataInputFromID(jobId, as_JSON=False)
Returns the Plugin Input Data :param jobId: job idenfier :type jobId: int :return: Job.DataInput JSON string
- classmethod getErrorFromId(jobId)[source]
Returns the error messages from plugin :param jobId: job idenfier :type jobId: int :return: error message as a string
- classmethod getErrorFromID(jobId)
Returns the error messages from plugin :param jobId: job idenfier :type jobId: int :return: error message as a string
dahu.plugin
Data Analysis RPC server over Tango:
Definiton of plugins
- class Plugin[source]
Bases:
objectA plugin is instanciated
Gets its input parameters as a dictionary from the setup method
Performs some work in the process
Sets the result as output attribute, should be a dictionary
The process can be an infinite loop or a server which can be aborted using the abort method
- DEFAULT_SET_UP = 'setup'
- DEFAULT_PROCESS = 'process'
- DEFAULT_TEAR_DOWN = 'teardown'
- DEFAULT_ABORT = 'abort'
- REPROCESS_IGNORE = []
- setup(kwargs=None)[source]
This is the second constructor to setup input variables and possibly initialize some objects
dahu.factory
Data Analysis RPC server over Tango:
Factory for the loading of plugins
- class Factory(workdir=None, plugin_path=None)[source]
Bases:
objectThis is a factory, it instanciates a plugin from it name
- registry = {}
- modules = {}
- plugin_dirs = {'/home/tester/dahu/build/lib/python3/dist-packages/dahu/plugins': ['id27', 'id15v2', 'id02', 'bm29', 'focus', 'id15', 'id31', 'example', 'pyfai']}
- reg_sem = <threading.Semaphore at 0x7f52e8c9e7b0: value=1>
- register(klass, fqn=None)
Register a class as a plugin which can be instanciated.
This can be used as a decorator
@plugin_factor.register
@param klass: class to be registered as a plugin @param fqn: fully qualified name @return klass
dahu.server
dahu.cache
Data Analysis RPC server over Tango:
Class Cache for storing the data in a Borg
- class DataCache(max_size=10, borg=True)[source]
Bases:
dictClass behaves like a dict with a finite size, used to cache some data for some time then discard them when other things are stored.
This class can be configured as a borg (singleton-like): It always returns the same values regardless to the instance of the object
- has_key(key)
D.__contains__(k) -> True if D has a key k, else False