Dahu Package

You can have a look at the man pages of the CLi tool: dahu-reprocess and dahu_server which is the Tang-device server.

This is the public API of dahu.

dahu.job

Data Analysis RPC server over Tango:

Contains the Job class which handles jobs. A static part of the class contains statistics of the class

class Job(name='plugin.Plugin', input_data={})[source]

Bases: threading.Thread

Class Job

  • Each instance will be a job

  • Constructor takes an input data and generates the JobId

  • Each instance will gave a “getOutput” method with optional join

  • there could be a “join” method, waiting for the job to finish

  • Each instance will have a “execute” method and returning a JobId

  • Each instance will have a “setCallBack” method that stores the name of the external callback

  • provide status of a job

  • Each instance has an abort method which can be used to stop processing (or a server)

Static part: * keeps track of all jobs status * leave the time to job to initialize * static class retrieve job-instance, status, small-log … * does not manage workload of the computer, should be managed at the ExecPlugin level

Used for the tango binding

== class variables == dictPluginStatus[pluginName] = [“uninitialized”|”running”|”executed”|”failed”] dictJobs [JobId] = Job.Instance

== static methods == getJob(JobId)

RESERVED keywords from Thread: start, run, join, name, ident, is_alive, daemon

start is overridden with a call to the factory to instanciate the plugin

STATE_INEXISTANT = 'inexistant'
STATE_UNINITIALIZED = 'uninitialized'
STATE_STARTING = 'starting'
STATE_RUNNING = 'running'
STATE_SUCCESS = 'success'
STATE_FAILURE = 'failure'
STATE_ABORTED = 'aborted'
STATE = ['uninitialized', 'starting', 'running', 'success', 'failure', 'aborted']
start()[source]

We need to create the plugin before starting the new tread…

join(timeout=None)[source]

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

abort()[source]

Tell the job to stop !

Needs to be implemented into the plugin !

run()[source]

Defines the sequence of execution of the plugin 1) the the state to “running” 2) sets the input data to the plugin 3) run the set-up 4) run the process 4) run the tear-down: always runs tear-down ! 5) run the call-backs

connect_callback(method=None)[source]
Parameters

method – function or method to be called - back

clean(force=False, wait=True)[source]

Frees the memory associated with the plugin

Parameters
  • force – Force garbage collection after clean-up

  • wait – wait for job to be finished

synchronize(timeout=None)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

property id
Returns

JobId

@rtype: integer

property plugin
Returns

the processing instance

@rtype: python object

property status
Returns

status of the Job

@rtype: string

property input_data

Returns the job input data

property output_data

Returns the job output data :param _bWait: shall we wait for the plugin to finish to retrieve output data: Yes by default. :type _bWait: boolean

getName()[source]
setName(name)[source]
property name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

classmethod synchronize_all()[source]

Wait for all jobs to finish.

classmethod synchronize_job(jobId, timeout=None)[source]

Wait for all a specific jobs to finish.

Parameters
  • jobId – identifier of the job … intg

  • timeout – timeout in second to wait

Returns

status of the job

classmethod getStatusFromID(jobId)[source]

Retrieve the job (hence the plugin) status

Parameters

jobId (int) – the Job identification number

Returns

the Job status

@rtype: string

classmethod getStatusFromId(jobId)

Retrieve the job (hence the plugin) status

Parameters

jobId (int) – the Job identification number

Returns

the Job status

@rtype: string

classmethod getJobFromID(jobId)[source]

Retrieve the job (hence the plugin)

Parameters

jobId – the Job identification number

Returns

the “Job instance”, which contains the plugin and the status

@rtype: a Python object, instance of Job.

classmethod getJobFromId(jobId)

Retrieve the job (hence the plugin)

Parameters

jobId – the Job identification number

Returns

the “Job instance”, which contains the plugin and the status

@rtype: a Python object, instance of Job.

classmethod cleanJobfromId(jobId, forceGC=True)[source]

Frees the memory associated with the top level plugin

Parameters
  • jobId (int) – the Job identification number

  • forceGC (boolean) – Force garbage collection after clean-up

classmethod cleanJobfromID(jobId, forceGC=True)

Frees the memory associated with the top level plugin

Parameters
  • jobId (int) – the Job identification number

  • forceGC (boolean) – Force garbage collection after clean-up

classmethod getDataOutputFromId(jobId, as_JSON=False)[source]

Returns the Plugin Output Data :param jobId: job idenfier :type jobId: int :return: Job.DataOutput JSON string

classmethod getDataOutputFromID(jobId, as_JSON=False)

Returns the Plugin Output Data :param jobId: job idenfier :type jobId: int :return: Job.DataOutput JSON string

classmethod getDataInputFromId(jobId, as_JSON=False)[source]

Returns the Plugin Input Data :param jobId: job idenfier :type jobId: int :return: Job.DataInput JSON string

classmethod getDataInputFromID(jobId, as_JSON=False)

Returns the Plugin Input Data :param jobId: job idenfier :type jobId: int :return: Job.DataInput JSON string

classmethod getErrorFromId(jobId)[source]

Returns the error messages from plugin :param jobId: job idenfier :type jobId: int :return: error message as a string

classmethod getErrorFromID(jobId)

Returns the error messages from plugin :param jobId: job idenfier :type jobId: int :return: error message as a string

classmethod stats()[source]

Retrieve some statistics and print them

dahu.plugin

Data Analysis RPC server over Tango:

Definiton of plugins

class Plugin[source]

Bases: object

A plugin is instanciated

  • Gets its input parameters as a dictionary from the setup method

  • Performs some work in the process

  • Sets the result as output attribute, should be a dictionary

  • The process can be an infinite loop or a server which can be aborted using the abort method

DEFAULT_SET_UP = 'setup'
DEFAULT_PROCESS = 'process'
DEFAULT_TEAR_DOWN = 'teardown'
DEFAULT_ABORT = 'abort'
REPROCESS_IGNORE = []
get_name()[source]
setup(kwargs=None)[source]

This is the second constructor to setup input variables and possibly initialize some objects

process()[source]

main processing of the plugin

teardown()[source]

method used to tear-down the plugin (close connection, files)

This is always run, even if process fails

get_info()[source]
abort()[source]

Method called to stop a server process

log_error(txt, do_raise=True)[source]

Way to log errors and raise error

log_warning(txt)[source]

Way to log warning

wait_for(job_id)[source]

Wait for another job to be finished …

Parameters

job_id – identifier for the job

Returns

the job object

class PluginFromFunction[source]

Bases: dahu.plugin.Plugin

Template class to build a plugin from a function

process()[source]

main processing of the plugin

plugin_from_function(function)[source]

Create a plugin class from a given function and registers it into the

Parameters

function – any function

Returns

plugin name to be used by the plugin_factory to get an instance

dahu.factory

Data Analysis RPC server over Tango:

Factory for the loading of plugins

load_source(module_name, file_path)[source]

Plugin loader which does not pollute sys.module

class Factory(workdir=None, plugin_path=None)[source]

Bases: object

This is a factory, it instanciates a plugin from it name

registry = {}
modules = {}
plugin_dirs = {'/users/kieffer/workspace-400/dahu/build/lib/dahu/plugins': ['__init__', 'id15v2', 'id02', 'pyfai', 'id02', 'id31', 'example', 'id15', 'sandbox', 'bm29', 'focus']}
reg_sem = <threading.Semaphore object>
add_directory(directory)[source]
search_plugin(plugin_name)[source]

Search for a given plugins … starting from the FQN package.class,

classmethod register(klass, fqn=None)[source]

Register a class as a plugin which can be instanciated.

This can be used as a decorator

@plugin_factor.register

@param klass: class to be registered as a plugin @param fqn: fully qualified name @return klass

register(klass, fqn=None)

Register a class as a plugin which can be instanciated.

This can be used as a decorator

@plugin_factor.register

@param klass: class to be registered as a plugin @param fqn: fully qualified name @return klass

dahu.server

class DahuDS(cl, name)[source]

Bases: tango.device_server.LatestDeviceImpl

Tango device server launcher for Dahu server.

get_name()[source]

Returns the name of the class

delete_device(self) → None[source]

Delete the device.

Parameters : None Return : None

init_device(self) → None[source]

Intialize the device.

Parameters : None Return : None

always_executed_hook(self) → None[source]

Hook method. Default method to implement an action necessary on a device before any command is executed. This method can be redefined in sub-classes in case of the default behaviour does not fullfill the needs

Parameters : None Return : None

Throws : DevFailed This method does not throw exception but a redefined method can.

read_attr_hardware(self, attr_list) → None[source]

Read the hardware to return attribute value(s). Default method to implement an action necessary on a device to read the hardware involved in a a read attribute CORBA call. This method must be redefined in sub-classes in order to support attribute reading

Parameters :
attr_list(sequence<int>) list of indices in the device object attribute vector

of an attribute to be read.

Return : None

Throws : DevFailed This method does not throw exception but a redefined method can.

read_jobSuccess(attr)[source]
read_jobFailure(attr)[source]
read_statisticsCollected(attr)[source]
read_serialize(attr)[source]
write_serialize(attr)[source]
getJobState(jobId)[source]
cleanJob(jobId)[source]
listPlugins()[source]

List all plugin currently loaded …. with a brief description

initPlugin(name)[source]

Creates a job with the given plugin

abort(jobId)[source]

Aborts a job

@param jobId: ID of the job to stop

quitDahu()[source]
startJob(argin)[source]

Starts a job

@param argin: 2-list [<Dahu plugin to execute>, <JSON serialized dict>] @return: jobID which is an int (-1 for error)

process_job()[source]

Process all jobs in the queue.

finished_processing(job)[source]

callback: when processing is done

@param job: instance of dahu.job.Job

process_event()[source]

process finished jobs on the tango side (issue with tango locks)

collectStatistics()[source]

Retrieve some statistics on all Dahu-Jobs @return: a page of information about Dahu-jobs

statistics()[source]

retrieve some statistics about past jobs.

getStatistics()[source]

just return statistics previously calculated

getJobOutput(jobId)[source]

Retrieve XML output form a job @param jobId: name of the job @return: output from a job

getJobInput(jobId)[source]

Retrieve input from a job as JSON string @param jobId: identifier of the job (int) @return: JSON serialized input from a job

getJobError(jobId)[source]

Retrieve error message from a job as a string @param jobId: identifier of the job (int) @return: Error message

waitJob(jobId)[source]

Wait for a job to be finished and returns the status. May cause Tango timeout if too slow to finish …. May do polling to wait the job actually started

@param jobId: identifier of the job (int) @return: status of the job

class DahuDSClass(name)[source]

Bases: tango._tango.DeviceClass

class_property_list = {}
device_property_list = {'plugins_directory': [tango._tango.CmdArgType.DevString, 'Dahu plugins directory', []]}
cmd_list = {'abort': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevBoolean, '']], 'cleanJob': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, 'Message']], 'collectStatistics': [[tango._tango.CmdArgType.DevVoid, 'nothing needed'], [tango._tango.CmdArgType.DevVoid, 'Collect some statistics about jobs within Dahu']], 'getJobError': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, 'Error message']], 'getJobInput': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, '<JSON serialized dict>']], 'getJobOutput': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, '<JSON serialized dict>']], 'getJobState': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, 'job state']], 'getStatistics': [[tango._tango.CmdArgType.DevVoid, 'nothing needed'], [tango._tango.CmdArgType.DevString, 'Retrieve statistics about Dahu-jobs']], 'initPlugin': [[tango._tango.CmdArgType.DevString, 'plugin name'], [tango._tango.CmdArgType.DevString, 'Message']], 'listPlugins': [[tango._tango.CmdArgType.DevVoid, 'nothing needed'], [tango._tango.CmdArgType.DevString, 'prints the list of all plugin classes currently loaded']], 'startJob': [[tango._tango.CmdArgType.DevVarStringArray, '[<Dahu plugin to execute>, <JSON serialized dict>]'], [tango._tango.CmdArgType.DevLong, 'job id']], 'waitJob': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, 'job state']]}
attr_list = {'jobFailure': [[tango._tango.CmdArgType.DevLong, tango._tango.AttrDataFormat.SCALAR, tango._tango.AttrWriteType.READ]], 'jobSuccess': [[tango._tango.CmdArgType.DevLong, tango._tango.AttrDataFormat.SCALAR, tango._tango.AttrWriteType.READ]], 'serialize': [[tango._tango.CmdArgType.DevBoolean, tango._tango.AttrDataFormat.SCALAR, tango._tango.AttrWriteType.READ_WRITE]], 'statisticsCollected': [[tango._tango.CmdArgType.DevString, tango._tango.AttrDataFormat.SCALAR, tango._tango.AttrWriteType.READ]]}

dahu.cache

Data Analysis RPC server over Tango:

Class Cache for storing the data in a Borg

class DataCache(max_size=10)[source]

Bases: dict

This class is a Borg : always returns the same values regardless to the instance of the object it is used as data storage for images … with a limit on the number of images to keep in memory.

has_key(key)

D.__contains__(k) -> True if D has a key k, else False

get(key, default=None)[source]

get method with default answer implemented

keys()[source]

Returns the list of keys, ordered

pop(key)[source]

Remove a key for the dictionary and return it’s value

dahu.utils