Newer
Older
tac2grc / hardcodedFiles / tacSinkBlock.c
@lukas lukas on 14 Aug 2022 38 KB initial commit
/*******************************************************************************
* E.S.O. - VLT project
*
* "@(#) $Id: tacSinkBlock.c 339094 2021-02-10 03:10:02Z jgil $"
*
* who      when      what
* -------  --------  ----------------------------------------------
* rdembet  06/08/20  New MonitorState block with no data averaging
* swehner  28/11/09  SPR20090234 Correction in rate adjustment in reset hook
* swehner  27/11/09  Fixed message queue handling
* swehner  10/10/09  New block FastProbe for unlimited rec frequency
* swehner  22/09/09  PPRS 32341 Replaced specialized tacSINK_SHMSG stack
*                    with standard vxWorks message queue. 
* bbauvir  21/09/04  VLTSW20040113 - One sample might be lost
* bbauvir  29/03/04  Monitor block averages over the reporting period
* bbauvir  29/12/02  Probe to the SampTask
* bbauvir  26/12/02  MsgQ access through DPC mechanism
* bbauvir  14/07/02  New MsgQ of TAC version 1.22
* bbauvir  21/03/01  Created
*/

/************************************************************************
*   NAME
*     tacSinkBlock - Standard data monitoring blocks
* 
*   SYNOPSIS
*     lcubootAutoCdBoot 1,"tacsink.boot"
*     < tacsink.boot
* 
*   DESCRIPTION
*     Block type: Display
*     Parameters: <sampPeriod>
*     Inputs: 1
*     Outputs: none
*
*     This function block samples the value of its input signal at the 
*     specified <sampPeriod> and issues a message containing the time
*     and the value of the input on the vxWorks terminal.
*     It does not allow modification of the parameter at runtime.
*
*     Block type: Monitor
*     Parameters: <sampPeriod>
*                 <inputNb>
*     Inputs: 10max
*     Outputs: none
*
*     This function block samples the value of its input signals at the 
*     specified <sampPeriod> (0.0 to send a message at the system sampling
*     period) and issues a message containing the time and the value of 
*     the inputs on the shared memory. This message is intercepted by the 
*     background task running on the master CPU. The monitored values are
*     written into the online database. The number of inputs of the block
*     is specified by <inputNb> parameter. 
*     This block does not allow modification of the parameters at runtime.
*
*     Block type: MonitorState
*     Parameters: <sampPeriod>
*                 <inputNb>
*     Inputs: 10max
*     Outputs: none
*
*     Same as Monitor block but without the averaging of received data,
*     see VLTSW-14260 JIRA ticket.
*
*     Block type: Probe
*     Parameters: <sampPeriod>
*                 <inputNb>
*     Inputs: 10max
*     Outputs: none
*
*     Block type: FastProbe
*     Parameters: <sampPeriod>
*                 <inputNb>
*     Inputs: 10max
*     Outputs: none
*
*     This function block samples the value of its input signals at the 
*     specified <sampPeriod> (0.0 to send a message at the system sampling
*     period) and issues a message containing the time and the value of
*     the inputs on the shared memory. This message is intercepted by the
*     background task running on the master CPU. The number of inputs of 
*     the block is specified by <inputNb> parameter.
*     This message is passed to an external application callback if 
*     provided.
*     This block does not allow modification of the parameters at runtime.
*     The normal Probe block has a limited sampling frequency of not more
*     than 2kHz. The reason is in particular the bottleneck between master
*     and slave CPU which all samples have to pass through a mechanism of
*     VME bus shared memory. On one-CPU-systems this limitation does not 
*     apply so you are free to choose faster sampling. You may of course
*     hit other limitations (processor speed, network traffic etc.)
*
*   FILES
*
*   ENVIRONMENT
*     TAC
*
*   RETURN VALUES 
*
*   CAUTIONS 
*     All these blocks compute an decimation value to use to trigger the 
*     sampling of the input signals, from the system sampling period. 
*     They should be created after any synchronization block.
*     
*     In order to cope with system performances, a minimum <sampPeriod>
*     is defined for each block. The minimum applied periods are 0.1 (10Hz), 
*     0.01 (100Hz) and 0.5e-3 (2kHz) for the 'Display', 'Monitor' and 
*     'Probe' blocks respectively.
*
*   EXAMPLES
*     # 5 signals @ 2Hz (monitored in the database)
*     TAC.BLOCK9.TYPE    Monitor
*     TAC.BLOCK9.NAME    Monitor
*     TAC.BLOCK9.PARAM   0.5, 5
*
*     # 5 signals @ the RT algorithm sampling period (0.0)
*     TAC.BLOCK10.TYPE   Probe
*     TAC.BLOCK10.NAME   Probe
*     TAC.BLOCK10.PARAM  0.0, 5
*
*   SEE ALSO
*     tacServerInstallDataCallback, tacServerRemoveDataCallback
*
*   BUGS   
*
*------------------------------------------------------------------------
*/

#define _POSIX_SOURCE 1
#include "vltPort.h"


/* 
 * System Headers
 */

#include <vxWorks.h>
#include <msgQLib.h>
#include <logLib.h>    /* logMsg ... */
#include <stdio.h>     /* printf ... */
#include <stdlib.h>    /* malloc ... */
#include <string.h>    /* memcpy ... */
#include <math.h>

/* 
 * Local Headers
 */

#include "tacStdBlock.h"
#include "tacShMsgQ.h"   

/*
 * Compilation directives
 */

/*
 * Constants
 */

#define tacSINK_DEFAULT_EXPECTED_PARAM 2

#define tacSINK_DEFAULT_INPUT_NUMBER  tacMAX_DATA_NUMBER    /* Up to 10 input channels */
#define tacSINK_DEFAULT_OUTPUT_NUMBER tacMAX_DATA_NUMBER

#define tacSINK_ONE_EXPECTED_PARAM 1
#define tacSINK_ONE_INPUT_NUMBER   1

#define tacSINK_DISPLAY_MIN_SAMPLING_PERIOD .1     /* 10Hz  */
#define tacSINK_MONITOR_MIN_SAMPLING_PERIOD .01    /* 100Hz */
#define tacSINK_PROBE_MIN_SAMPLING_PERIOD .5e-3    /* 2kHz  */

#define tacSINK_DISPLAY_LOG_STRING_SIZE 128

#define tacSINK_MONITOR_DEFAULT_DB_POINT "<alias>tac:data:monitor"
#define tacSINK_MONITOR_MAX_DB_ADDR_SIZE 64

#define tacSINK_MONITOR_QUEUE_SIZE 6
#define tacSINK_PROBE_QUEUE_SIZE 6


typedef enum {
  tacSINK_SAMPLING_PERIOD = 0,
  tacSINK_INPUT_NUMBER,
  tacSINK_MONITOR_DB_POINT,
}tacSINK_SCOPE_PARAM_INDEX;

/* 
 * Types
 */

typedef struct {
    unsigned short rate;
    unsigned short counter;
    unsigned short inputNb;
    unsigned int lost; 
    double rateParam;
    char* dbPoint;
    MSG_Q_ID msgq;
    tacSHMSG shmMsg;
}tacSINK_SCOPE_LOCAL_PARAMETER;

/*
 * Function declaration
 */

extern STATUS tacDpcPushRequest(tacVOIDFCTPTR pFunct, void* parameter);

/*
 * Local function declaration
 */

/*
 * Note: Routines called from within the algorithm must be declared with
 *       static __inline__ <prototype>
 */

STATUS tacSinkSendMonitorMessage(MSG_Q_ID * msgqvoid);
STATUS tacSinkSendMonitorStateMessage(MSG_Q_ID * msgqvoid);
STATUS tacSinkSendProbeMessage(MSG_Q_ID * msgq);
STATUS tacSinkDeleteMessageQueue(MSG_Q_ID * msgq);
STATUS tacSinkCommonBlockConstructor(tacSTDBLOCK* pSelf, tacERROR* error);

/*
 * Global variables
 */

char logStr[tacSINK_DISPLAY_LOG_STRING_SIZE];    /* Logging string used for Display */


/*
 * Function definition
 */

/*
 * Constructor Hooks
 */

STATUS tacSinkDisplayBlockConstructor (tacSTDBLOCK* pSelf,
                                       tacSTDBLOCK_PARAM* parameter,
			               tacERROR* error)
{
  double* pRealParam = NULL;
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = NULL;

  /* Check that the expected number of parameters has been received */
  if (parameter->number < tacSINK_ONE_EXPECTED_PARAM)
    {
      char errorParam[40];

      sprintf(errorParam, "Expected %d, received %d", tacSINK_ONE_EXPECTED_PARAM, parameter->number);
      tacRTC_ERR(tacERR_PARAM_NUMBER, errorParam);
      return ERROR;
    }

  /* Check that the parameters are valid */
  pRealParam = (double*) parameter->buffer;

  /* Passed parameter is the sampling period of the block */
  if (pRealParam[tacSINK_SAMPLING_PERIOD] < (*pSelf->shared).samplingPeriod)
    {
      pRealParam[tacSINK_SAMPLING_PERIOD] = (*pSelf->shared).samplingPeriod;
    }

  if (pRealParam[tacSINK_SAMPLING_PERIOD] < tacSINK_DISPLAY_MIN_SAMPLING_PERIOD)
    {
      pRealParam[tacSINK_SAMPLING_PERIOD] = tacSINK_DISPLAY_MIN_SAMPLING_PERIOD;
    }

  /* Call common constructor */
  if (tacSinkCommonBlockConstructor(pSelf, error) != OK)
    {
      return ERROR;
    }

  /* Store parameters */
  pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;    /* We will only store short integers */

  /*
   * Compute the rate at which data has to be processed - divide and take
   * the integer part of the result
   */

  pSelfParam->rate = (unsigned short) (pRealParam[tacSINK_SAMPLING_PERIOD] / (*pSelf->shared).samplingPeriod);
  pSelfParam->counter = 0;    /* Decimation counter */

  /*
   * Perform other block initialization operations
   */

  return OK;
}

STATUS tacSinkMonitorBlockConstructor (tacSTDBLOCK* pSelf,
                                       tacSTDBLOCK_PARAM* parameter,
                                       tacERROR* error)
{
  double* pRealParam = NULL;
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = NULL;
  unsigned short arrayIndex = 0;

  /* Check that the expected number of parameters has been received */
  if (parameter->number < tacSINK_DEFAULT_EXPECTED_PARAM)
    {
      char errorParam[40];

      sprintf(errorParam, "Expected %d, received %d", tacSINK_DEFAULT_EXPECTED_PARAM, parameter->number);
      tacRTC_ERR(tacERR_PARAM_NUMBER, errorParam);
      return ERROR;
    }

  /* Check that the parameters are valid */
  pRealParam = (double*) parameter->buffer;

  /* Passed parameter is the sampling period of the block */
  if (pRealParam[tacSINK_SAMPLING_PERIOD] < (*pSelf->shared).samplingPeriod)
    {
      pRealParam[tacSINK_SAMPLING_PERIOD] = (*pSelf->shared).samplingPeriod;
    }

  if (pRealParam[tacSINK_SAMPLING_PERIOD] < tacSINK_MONITOR_MIN_SAMPLING_PERIOD)
    {
      pRealParam[tacSINK_SAMPLING_PERIOD] = tacSINK_MONITOR_MIN_SAMPLING_PERIOD;
    }

  /* Number of inputs is minimum 1 up to tacMSG_MAX_DATA_NUMBER */
  if ((pRealParam[tacSINK_INPUT_NUMBER] < 1) || 
      (pRealParam[tacSINK_INPUT_NUMBER] > tacMAX_DATA_NUMBER))
    {
      tacRTC_ERR(tacERR_PARAM_INVALID, "");
      return ERROR;
    }

  /* Call common constructor */
  if (tacSinkCommonBlockConstructor(pSelf, error) != OK)
    {
      return ERROR;
    }

  /* Store parameters */
  pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /*
   * Compute the rate at which data has to be processed - divide and take
   * the integer part of the result
   */

  pSelfParam->rate = (unsigned short) (pRealParam[tacSINK_SAMPLING_PERIOD] / (*pSelf->shared).samplingPeriod);
  pSelfParam->counter = 0;    /* Decimation counter */
  pSelfParam->inputNb = (unsigned short) pRealParam[tacSINK_INPUT_NUMBER];

  /* Optional parameter is an application-specific DB point */
  if (parameter->number == tacSINK_DEFAULT_EXPECTED_PARAM + 1)
    {
      printf("tacSinkMonitorBlockConstructor - DB point %s\n",(char*) &(pRealParam[tacSINK_MONITOR_DB_POINT]));
      pSelfParam->dbPoint = (char*) malloc(tacSINK_MONITOR_MAX_DB_ADDR_SIZE);
      strcpy(pSelfParam->dbPoint, (char*) &(pRealParam[tacSINK_MONITOR_DB_POINT]));
    }
  else
    {
      printf("tacSinkMonitorBlockConstructor - Default DB point\n");
      pSelfParam->dbPoint = NULL;
    }

  /* Specify the number of input signals to attach to become resolved */
  tacStdBlockSetInputNumber(pSelf, pSelfParam->inputNb);

  /*
   * Perform other block initialization operations
   */

  /* Create the block internal message queue which tosses the messages out of the 
   * interrupt context. They are buffered in the queue until the DPC tasks
   * picks them up and sends them over the shared memory message queue. 
   * Sorry for that double-queuing, that's because of the M-S architecture */
  
  pSelfParam->msgq = msgQCreate(tacSINK_MONITOR_QUEUE_SIZE, sizeof(tacSHMSG), 0);
      if ( pSelfParam->msgq == NULL ) 
	  { 
	  char errorParam[40];

	  sprintf(errorParam, "Could not create probe block message queue");
	  tacRTC_ERR(tacERR_MALLOC, errorParam);
	  return ERROR;
	  } 

  /* will count if a message can't be pushed in the queue */
  pSelfParam->lost = 0;   

  /* Warn the background task that a new probe block has been created */
  pSelfParam->shmMsg.type = tacSHMSG_ADD_MONITOR;
  strcpy(pSelfParam->shmMsg.msgBody.monitor.name, pSelf->name);
  
  if (pSelfParam->dbPoint == NULL)
      {
      strcpy((char*) &(pSelfParam->shmMsg.msgBody.monitor.data), tacSINK_MONITOR_DEFAULT_DB_POINT);
      }
  else
      {
      strcpy((char*) &(pSelfParam->shmMsg.msgBody.monitor.data), *((char**) &(pSelfParam->dbPoint)));
      }

  printf("tacSinkMonitorBlockConstructor - Attach to %s\n",(char*) &(pSelfParam->shmMsg.msgBody.monitor.data));
  
  /* Send message */
  tacShMsgQSendBgInfo(&(pSelfParam->shmMsg));
  
  /* Initialize the message parameters */
  pSelfParam->shmMsg.type = tacSHMSG_MONITOR_DATA;
  strcpy(pSelfParam->shmMsg.msgBody.monitor.name, pSelf->name);
  pSelfParam->shmMsg.msgBody.monitor.data.number = pSelfParam->inputNb;    

  /* The output port is used to average the inputs over the reporting period */
  for (arrayIndex = 0; arrayIndex < pSelfParam->inputNb; arrayIndex++)
      {
      pSelf->output[arrayIndex] = 0.0;
      }
  
  return OK;
}

STATUS tacSinkMonitorStateBlockConstructor (tacSTDBLOCK* pSelf,
					    tacSTDBLOCK_PARAM* parameter,
					    tacERROR* error)
{
  double* pRealParam = NULL;
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = NULL;
  unsigned short arrayIndex = 0;

  /* Check that the expected number of parameters has been received */
  if (parameter->number < tacSINK_DEFAULT_EXPECTED_PARAM)
    {
      char errorParam[40];

      sprintf(errorParam, "Expected %d, received %d", tacSINK_DEFAULT_EXPECTED_PARAM, parameter->number);
      tacRTC_ERR(tacERR_PARAM_NUMBER, errorParam);
      return ERROR;
    }

  /* Check that the parameters are valid */
  pRealParam = (double*) parameter->buffer;

  /* Passed parameter is the sampling period of the block */
  if (pRealParam[tacSINK_SAMPLING_PERIOD] < (*pSelf->shared).samplingPeriod)
    {
      pRealParam[tacSINK_SAMPLING_PERIOD] = (*pSelf->shared).samplingPeriod;
    }

  if (pRealParam[tacSINK_SAMPLING_PERIOD] < tacSINK_MONITOR_MIN_SAMPLING_PERIOD)
    {
      pRealParam[tacSINK_SAMPLING_PERIOD] = tacSINK_MONITOR_MIN_SAMPLING_PERIOD;
    }

  /* Number of inputs is minimum 1 up to tacMSG_MAX_DATA_NUMBER */
  if ((pRealParam[tacSINK_INPUT_NUMBER] < 1) || 
      (pRealParam[tacSINK_INPUT_NUMBER] > tacMAX_DATA_NUMBER))
    {
      tacRTC_ERR(tacERR_PARAM_INVALID, "");
      return ERROR;
    }

  /* Call common constructor */
  if (tacSinkCommonBlockConstructor(pSelf, error) != OK)
    {
      return ERROR;
    }

  /* Store parameters */
  pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /*
   * Compute the rate at which data has to be processed - divide and take
   * the integer part of the result
   */

  pSelfParam->rate = (unsigned short) (pRealParam[tacSINK_SAMPLING_PERIOD] / (*pSelf->shared).samplingPeriod);
  pSelfParam->counter = 0;    /* Decimation counter */
  pSelfParam->inputNb = (unsigned short) pRealParam[tacSINK_INPUT_NUMBER];

  /* Optional parameter is an application-specific DB point */
  if (parameter->number == tacSINK_DEFAULT_EXPECTED_PARAM + 1)
    {
      printf("tacSinkMonitorStateBlockConstructor - DB point %s\n",(char*) &(pRealParam[tacSINK_MONITOR_DB_POINT]));
      pSelfParam->dbPoint = (char*) malloc(tacSINK_MONITOR_MAX_DB_ADDR_SIZE);
      strcpy(pSelfParam->dbPoint, (char*) &(pRealParam[tacSINK_MONITOR_DB_POINT]));
    }
  else
    {
      printf("tacSinkMonitorStateBlockConstructor - Default DB point\n");
      pSelfParam->dbPoint = NULL;
    }

  /* Specify the number of input signals to attach to become resolved */
  tacStdBlockSetInputNumber(pSelf, pSelfParam->inputNb);

  /*
   * Perform other block initialization operations
   */

  /* Create the block internal message queue which tosses the messages out of the 
   * interrupt context. They are buffered in the queue until the DPC tasks
   * picks them up and sends them over the shared memory message queue. 
   * Sorry for that double-queuing, that's because of the M-S architecture */
  
  pSelfParam->msgq = msgQCreate(tacSINK_MONITOR_QUEUE_SIZE, sizeof(tacSHMSG), 0);
      if ( pSelfParam->msgq == NULL ) 
	  { 
	  char errorParam[40];

	  sprintf(errorParam, "Could not create probe block message queue");
	  tacRTC_ERR(tacERR_MALLOC, errorParam);
	  return ERROR;
	  } 

  /* will count if a message can't be pushed in the queue */
  pSelfParam->lost = 0;   

  /* Warn the background task that a new probe block has been created */
  pSelfParam->shmMsg.type = tacSHMSG_ADD_MONITOR;
  strcpy(pSelfParam->shmMsg.msgBody.monitor.name, pSelf->name);
  
  if (pSelfParam->dbPoint == NULL)
      {
      strcpy((char*) &(pSelfParam->shmMsg.msgBody.monitor.data), tacSINK_MONITOR_DEFAULT_DB_POINT);
      }
  else
      {
      strcpy((char*) &(pSelfParam->shmMsg.msgBody.monitor.data), *((char**) &(pSelfParam->dbPoint)));
      }

  printf("tacSinkMonitorStateBlockConstructor - Attach to %s\n",(char*) &(pSelfParam->shmMsg.msgBody.monitor.data));
  
  /* Send message */
  tacShMsgQSendBgInfo(&(pSelfParam->shmMsg));
  
  /* Initialize the message parameters */
  pSelfParam->shmMsg.type = tacSHMSG_MONITOR_DATA;
  strcpy(pSelfParam->shmMsg.msgBody.monitor.name, pSelf->name);
  pSelfParam->shmMsg.msgBody.monitor.data.number = pSelfParam->inputNb;    

  /* Init output port which won't be used to average the inputs over the reporting period */
  for (arrayIndex = 0; arrayIndex < pSelfParam->inputNb; arrayIndex++)
      {
      pSelf->output[arrayIndex] = 0.0;
      }
  
  return OK;
}

STATUS tacSinkProbeBlockCommonConstructor (tacSTDBLOCK* pSelf,
					   tacSTDBLOCK_PARAM* parameter,
					   double minPeriod,
					   tacERROR* error)
{
  double* pRealParam = NULL;
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = NULL;

  /* Check that the expected number of parameters has been received */
  if (parameter->number < tacSINK_DEFAULT_EXPECTED_PARAM)
    {
      char errorParam[40];

      sprintf(errorParam, "Expected %d, received %d", tacSINK_DEFAULT_EXPECTED_PARAM, parameter->number);
      tacRTC_ERR(tacERR_PARAM_NUMBER, errorParam);
      return ERROR;
    }

  /* Check that the parameters are valid */
  pRealParam = (double*) parameter->buffer;

  /* Passed parameter is the sampling period of the block */
  if (pRealParam[tacSINK_SAMPLING_PERIOD] < (*pSelf->shared).samplingPeriod)
    {
      pRealParam[tacSINK_SAMPLING_PERIOD] = (*pSelf->shared).samplingPeriod;
    }

  if (pRealParam[tacSINK_SAMPLING_PERIOD] < minPeriod)
      {
      pRealParam[tacSINK_SAMPLING_PERIOD] = minPeriod;
      }

  /* Number of inputs is minimum 1 up to tacMSG_MAX_DATA_NUMBER */
  if ((pRealParam[tacSINK_INPUT_NUMBER] < 1) || 
      (pRealParam[tacSINK_INPUT_NUMBER] > tacMAX_DATA_NUMBER))
    {
      tacRTC_ERR(tacERR_PARAM_INVALID, "");
      return ERROR;
    }

  /* Call common constructor */
  if (tacSinkCommonBlockConstructor(pSelf, error) != OK)
    {
      return ERROR;
    }

  /* Store parameters */
  pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /*
   * Compute the rate at which data has to be processed - divide and take
   * the integer part of the result
   */

  pSelfParam->rateParam = pRealParam[tacSINK_SAMPLING_PERIOD];
  pSelfParam->rate = (unsigned short) (pRealParam[tacSINK_SAMPLING_PERIOD] / (*pSelf->shared).samplingPeriod);
  pSelfParam->counter = 0;    /* Decimation counter */
  pSelfParam->inputNb = (unsigned short) pRealParam[tacSINK_INPUT_NUMBER];

  /* Specify the number of input signals to attach to become resolved */
  tacStdBlockSetInputNumber(pSelf, pSelfParam->inputNb);

  /*
   * Perform other block initialization operations
   */

  /* Create the block internal message queue which gets the messages out of the 
   * interrupt context. They are buffered in the queue until the DPC tasks
   * picks them up and sends them over the shared memory message queue. 
   * Sorry for that double-queuing, that's because of the M-S architecture */
  
  /* There is only one message queue for all probe blocks! */
  pSelfParam->msgq = msgQCreate(tacSINK_PROBE_QUEUE_SIZE, sizeof(tacSHMSG), 0);
  if ( pSelfParam->msgq == NULL ) 
      { 
      char errorParam[40];
      
      sprintf(errorParam, "Could not create probe block message queue");
      tacRTC_ERR(tacERR_MALLOC, errorParam);
      return ERROR;
      }

  /* will count if a message can't be pushed in the queue */
  pSelfParam->lost = 0;   

  /* Warn the background task that a new probe block has been created */
  pSelfParam->shmMsg.type = tacSHMSG_ADD_PROBE;
  strcpy(pSelfParam->shmMsg.msgBody.monitor.name, pSelf->name);

  /* Send message */
  tacShMsgQSendDataSample(&pSelfParam->shmMsg);

  /* Initialize the message parameters */
  pSelfParam->shmMsg.type = tacSHMSG_TRANSFER_DATA;
  strcpy(pSelfParam->shmMsg.msgBody.monitor.name, pSelf->name);
  pSelfParam->shmMsg.msgBody.monitor.data.number = pSelfParam->inputNb;    

  return OK;
}

/* Probe block has a max. sampling frequency of 2kHz */
STATUS tacSinkProbeBlockConstructor (tacSTDBLOCK* pSelf,
                                     tacSTDBLOCK_PARAM* parameter,
                                     tacERROR* error)
{
    return tacSinkProbeBlockCommonConstructor (pSelf, parameter, tacSINK_PROBE_MIN_SAMPLING_PERIOD, error);
}

/* Fast probe is the same as probe but not does have the
 * limitation on the max. recording frequency */
STATUS tacSinkFastProbeBlockConstructor (tacSTDBLOCK* pSelf,
                                     tacSTDBLOCK_PARAM* parameter,
                                     tacERROR* error)
{
    return tacSinkProbeBlockCommonConstructor (pSelf, parameter, 0.0, error);
}

void tacSinkProbeBlockReset (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = 
      (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /*
   * Re-Compute the rate at which data has to be processed 
   */  
  if (pSelfParam->rateParam < (*pSelf->shared).samplingPeriod )
      {
      pSelfParam->rate = 1;
      }
  else
      {
      pSelfParam->rate = (unsigned short) 
	  (pSelfParam->rateParam / (*pSelf->shared).samplingPeriod);
      }

  return;
}

/*
 * Report parameters Hooks
 */

STATUS tacSinkDisplayBlockGetParameter (tacSTDBLOCK* pSelf,
                                        tacSTDBLOCK_PARAM* parameter,
                                        tacERROR* error)
{
  double* pRealParam = (double*) parameter->buffer;
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /* Send back the expected number of parameters */
  parameter->number = tacSINK_ONE_EXPECTED_PARAM;

  /* Report current parameters */
  pRealParam[tacSINK_SAMPLING_PERIOD] = (double) (pSelfParam->rate * (*pSelf->shared).samplingPeriod);

  return OK;
}

STATUS tacSinkMonitorBlockGetParameter (tacSTDBLOCK* pSelf,
					tacSTDBLOCK_PARAM* parameter,
					tacERROR* error)
{
  double* pRealParam = (double*) parameter->buffer;
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /* Send back the expected number of parameters */
  parameter->number = tacSINK_DEFAULT_EXPECTED_PARAM;

  /* Report current parameters */
  pRealParam[tacSINK_SAMPLING_PERIOD] = (double) (pSelfParam->rate * (*pSelf->shared).samplingPeriod);
  pRealParam[tacSINK_INPUT_NUMBER] = (double) pSelfParam->inputNb;

  if (*((char**) &(pSelfParam->dbPoint)) != NULL)
    {
      parameter->number += 1;
      parameter->type[tacSINK_MONITOR_DB_POINT] = tacPARAM_STRING;
      strcpy((char*) &(pRealParam[tacSINK_MONITOR_DB_POINT]), *((char**) &(pSelfParam->dbPoint)));
    }

  return OK;
}

STATUS tacSinkMonitorStateBlockGetParameter (tacSTDBLOCK* pSelf,
					     tacSTDBLOCK_PARAM* parameter,
					     tacERROR* error)
{
  double* pRealParam = (double*) parameter->buffer;
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /* Send back the expected number of parameters */
  parameter->number = tacSINK_DEFAULT_EXPECTED_PARAM;

  /* Report current parameters */
  pRealParam[tacSINK_SAMPLING_PERIOD] = (double) (pSelfParam->rate * (*pSelf->shared).samplingPeriod);
  pRealParam[tacSINK_INPUT_NUMBER] = (double) pSelfParam->inputNb;

  if (*((char**) &(pSelfParam->dbPoint)) != NULL)
    {
      parameter->number += 1;
      parameter->type[tacSINK_MONITOR_DB_POINT] = tacPARAM_STRING;
      strcpy((char*) &(pRealParam[tacSINK_MONITOR_DB_POINT]), *((char**) &(pSelfParam->dbPoint)));
    }

  return OK;
}

STATUS tacSinkProbeBlockGetParameter (tacSTDBLOCK* pSelf,
				      tacSTDBLOCK_PARAM* parameter,
				      tacERROR* error)
{
  double* pRealParam = (double*) parameter->buffer;
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /* Send back the expected number of parameters */
  parameter->number = tacSINK_DEFAULT_EXPECTED_PARAM;

  /* Report current parameters */
  pRealParam[tacSINK_SAMPLING_PERIOD] = (double) (pSelfParam->rate * (*pSelf->shared).samplingPeriod);
  pRealParam[tacSINK_INPUT_NUMBER] = (double) pSelfParam->inputNb;

  return OK;
}

/*
 * Change parameters Hooks
 */

/* 
 * Algorithm Hooks
 */

void tacSinkDisplayBlockAlgorithm (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  if ((pSelfParam->counter % pSelfParam->rate) == 0)    /* It's time to process data */
    {
      sprintf(logStr,"Name: %s Time: %ld.%6ld Value: %g\n",
              pSelf->name,
              (*pSelf->shared).currentTime.seconds,
              (*pSelf->shared).currentTime.microsec,
              tacStdBlockGetInput(pSelf,0));

      tacDpcPushRequest((tacVOIDFCTPTR) &printf, logStr);

      pSelfParam->counter = 0;
    }

  pSelfParam->counter++;
}

void tacSinkMonitorBlockAlgorithm (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;
  unsigned short arrayIndex = 0;

  /* The output port is used to average the inputs over the reporting period */
  for (arrayIndex = 0; arrayIndex < pSelfParam->inputNb; arrayIndex++)
      {
      pSelf->output[arrayIndex] += tacStdBlockGetInput(pSelf,arrayIndex);
      }
  
  if ((pSelfParam->counter % pSelfParam->rate) == 0)    /* It's time to process data */
      {
      tacDATA* pData = &(pSelfParam->shmMsg.msgBody.monitor.data);    /* We are only interested in message body */
      
      /* pData->number = pSelfParam->inputNb; */   
      pData->timeStamp = (*pSelf->shared).currentTime;

      /* pData points to the body of the Message */
      for (arrayIndex = 0; arrayIndex < pSelfParam->inputNb; arrayIndex++)
	  {
	  pData->buffer[arrayIndex] = pSelf->output[arrayIndex] / pSelfParam->rate;
	  }

      /* Push message into probe block's message que to get the
       * data out of the interrupt comtext. */
      if ( msgQSend (pSelfParam->msgq, (char*) &(pSelfParam->shmMsg), sizeof(tacSHMSG), NO_WAIT, 0) != OK )
	  {
	  (pSelfParam->lost)++;
	  }

      /* Defer procedure call */
      tacDpcPushRequest((tacVOIDFCTPTR) &tacSinkSendMonitorMessage, &pSelfParam->msgq);

      pSelfParam->counter = 0;

      /* The output port is used to average the inputs over the reporting period */
      for (arrayIndex = 0; arrayIndex < pSelfParam->inputNb; arrayIndex++)
	{
	  pSelf->output[arrayIndex] = 0.0;
	}
    }

  pSelfParam->counter++;
}

void tacSinkMonitorStateBlockAlgorithm (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;
  unsigned short arrayIndex = 0;
 
  if ((pSelfParam->counter % pSelfParam->rate) == 0)    /* It's time to process data */
      {
      tacDATA* pData = &(pSelfParam->shmMsg.msgBody.monitor.data);    /* We are only interested in message body */
      
      /* pData->number = pSelfParam->inputNb; */   
      pData->timeStamp = (*pSelf->shared).currentTime;

      /* pData points to the body of the Message */
      for (arrayIndex = 0; arrayIndex < pSelfParam->inputNb; arrayIndex++)
	  {
	  pData->buffer[arrayIndex] = tacStdBlockGetInput(pSelf,arrayIndex);
	  }

      /* Push message into probe block's message que to get the
       * data out of the interrupt comtext. */
      if ( msgQSend (pSelfParam->msgq, (char*) &(pSelfParam->shmMsg), sizeof(tacSHMSG), NO_WAIT, 0) != OK )
	  {
	  (pSelfParam->lost)++;
	  }

      /* Defer procedure call */
      tacDpcPushRequest((tacVOIDFCTPTR) &tacSinkSendMonitorMessage, &pSelfParam->msgq);

      pSelfParam->counter = 0;
    }

  pSelfParam->counter++;
}

void tacSinkProbeBlockAlgorithm (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;
  unsigned short arrayIndex = 0;
  
  if ((pSelfParam->counter % pSelfParam->rate) == 0)    /* It's time to process data */
      {
      tacDATA* pData = &(pSelfParam->shmMsg.msgBody.monitor.data);    /* We are only interested in message body */

      pData->timeStamp = (*pSelf->shared).currentTime;
      
      /* pData points to the body of the Message */
      for (arrayIndex = 0; arrayIndex < pSelfParam->inputNb; arrayIndex++)
	  {
	  pData->buffer[arrayIndex] = tacStdBlockGetInput(pSelf,arrayIndex);
	  }
      
      /* Push message into probe block's message que to get the
       * data out of the interrupt comtext. */
      if ( msgQSend (pSelfParam->msgq, (char*) &(pSelfParam->shmMsg), sizeof(tacSHMSG), NO_WAIT, 0) != OK )
	  {
	  (pSelfParam->lost)++;
	  }

      /* Defer procedure call:
       * tacSinkSendProbeMessage is called outside the interrupt routine 
       * and will take care to pick up the message from the probe block's 
       * message queue and send it over the shared memory message queue */
      tacDpcPushRequest((tacVOIDFCTPTR) &tacSinkSendProbeMessage, &pSelfParam->msgq);
      
      pSelfParam->counter = 0;
    }

  pSelfParam->counter++;
}

/* 
 * Reset Hooks
 */

/* 
 * Stop Hooks
 */

/*
 * Destructor Hooks
 */

/*
 * Note: This destructor is used mainly to send a MONITOR message containing
 *       0.0 in order to reset the database array (otherwise going from an 
 *       algorithm with 4 monitored data to one with 3 monitored data leaves
 *       the 4th value equal to the last ...)
 */

void tacSinkMonitorBlockDestructor (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;
  tacDATA* pData = &(pSelfParam->shmMsg.msgBody.monitor.data); 
  unsigned short arrayIndex = 0;

  /* Reset DB values */
  pSelfParam->shmMsg.type = tacSHMSG_MONITOR_DATA;
  pData = &(pSelfParam->shmMsg.msgBody.monitor.data);

  pData->number = pSelfParam->inputNb;    

  /* pData points to the body of the Message */
  for (arrayIndex = 0; arrayIndex < pSelfParam->inputNb; arrayIndex++)
    {
      pData->buffer[arrayIndex] = 0.0;
    }

  /* Send message */
  tacShMsgQSendBgInfo(&pSelfParam->shmMsg);

  pSelfParam->shmMsg.type = tacSHMSG_REMOVE_MONITOR;
  strcpy(pSelfParam->shmMsg.msgBody.monitor.name, pSelf->name);

  /* Send message */
  tacShMsgQSendBgInfo(&pSelfParam->shmMsg);

  free(pSelfParam->dbPoint);

  /* delete internal message queue. Do this as deferred procedure call
   * to make sure it is only called when all messages have been send 
   * (through the same deferred call mechanism) */
  tacDpcPushRequest((tacVOIDFCTPTR) tacSinkDeleteMessageQueue, &(pSelfParam->msgq));

  return;
}

void tacSinkMonitorStateBlockDestructor (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;
  tacDATA* pData = &(pSelfParam->shmMsg.msgBody.monitor.data); 
  unsigned short arrayIndex = 0;

  /* Reset DB values */
  pSelfParam->shmMsg.type = tacSHMSG_MONITOR_DATA;
  pData = &(pSelfParam->shmMsg.msgBody.monitor.data);

  pData->number = pSelfParam->inputNb;    

  /* pData points to the body of the Message */
  for (arrayIndex = 0; arrayIndex < pSelfParam->inputNb; arrayIndex++)
    {
      pData->buffer[arrayIndex] = 0.0;
    }

  /* Send message */
  tacShMsgQSendBgInfo(&pSelfParam->shmMsg);

  pSelfParam->shmMsg.type = tacSHMSG_REMOVE_MONITOR;
  strcpy(pSelfParam->shmMsg.msgBody.monitor.name, pSelf->name);

  /* Send message */
  tacShMsgQSendBgInfo(&pSelfParam->shmMsg);

  free(pSelfParam->dbPoint);

  /* delete internal message queue. Do this as deferred procedure call
   * to make sure it is only called when all messages have been send 
   * (through the same deferred call mechanism) */
  tacDpcPushRequest((tacVOIDFCTPTR) tacSinkDeleteMessageQueue, &(pSelfParam->msgq));

  return;
}

void tacSinkProbeBlockDestructor (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  pSelfParam->shmMsg.type = tacSHMSG_REMOVE_PROBE;
  strcpy(pSelfParam->shmMsg.msgBody.monitor.name, pSelf->name);

  /* Send message */
  tacShMsgQSendDataSample(&pSelfParam->shmMsg);

  /* delete internal message queue. Do this as deferred procedure call
   * to make sure it is only called when all messages have been send 
   * (through the same deferred call mechanism) */
  tacDpcPushRequest((tacVOIDFCTPTR) tacSinkDeleteMessageQueue, &(pSelfParam->msgq));

  return;
}

/*
 * Show Hooks
 */

void tacSinkDisplayBlockShow (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /* Print parameter values */
  printf("   Update rate: %d\n",pSelfParam->rate);

  return;
}

void tacSinkMonitorBlockShow (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /* Print parameter values */
  printf("   Update rate: %d\n",pSelfParam->rate);
  printf("   Number of input signals: %d\n",pSelfParam->inputNb);
  printf("   Queued samples: %d\n", msgQNumMsgs(pSelfParam->msgq));
  printf("   Lost samples: %d\n",pSelfParam->lost);

  if (pSelfParam->dbPoint != NULL)
    {
      printf("   DB Point: %s\n", pSelfParam->dbPoint);
    }

  return;
}
 
void tacSinkMonitorStateBlockShow (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /* Print parameter values */
  printf("   Using MonitorState block, not averaging received data\n");
  printf("   Update rate: %d\n",pSelfParam->rate);
  printf("   Number of input signals: %d\n",pSelfParam->inputNb);
  printf("   Queued samples: %d\n", msgQNumMsgs(pSelfParam->msgq));
  printf("   Lost samples: %d\n",pSelfParam->lost);

  if (pSelfParam->dbPoint != NULL)
    {
      printf("   DB Point: %s\n", pSelfParam->dbPoint);
    }

  return;
}

void tacSinkProbeBlockShow (tacSTDBLOCK* pSelf)
{
  tacSINK_SCOPE_LOCAL_PARAMETER* pSelfParam = (tacSINK_SCOPE_LOCAL_PARAMETER*) pSelf->parameter;

  /* Print parameter values */
  printf("   Update rate: %d\n",pSelfParam->rate);
  printf("   Update rate param (sec): %f\n",pSelfParam->rateParam);
  printf("   Number of input signals: %d\n",pSelfParam->inputNb);
  printf("   Queued samples: %d\n", msgQNumMsgs(pSelfParam->msgq));
  printf("   Lost samples: %d\n",pSelfParam->lost);

  return;
}
 
/*
 * Installation function
 */

tacSTDBLOCK_TYPE tacSinkDisplayTypeInfo = {
  "Display",
  &tacSinkDisplayBlockConstructor,
  &tacSinkDisplayBlockGetParameter,
  &tacSinkDisplayBlockAlgorithm,
  NULL,
  NULL,
  NULL,
  NULL,
  &tacSinkDisplayBlockShow,
};

tacSTDBLOCK_TYPE tacSinkMonitorTypeInfo = {
  "Monitor",
  &tacSinkMonitorBlockConstructor,
  &tacSinkMonitorBlockGetParameter,
  &tacSinkMonitorBlockAlgorithm,
  NULL,
  NULL,
  NULL,
  &tacSinkMonitorBlockDestructor,
  &tacSinkMonitorBlockShow,
};

tacSTDBLOCK_TYPE tacSinkMonitorStateTypeInfo = {
  "MonitorState",
  &tacSinkMonitorStateBlockConstructor,
  &tacSinkMonitorStateBlockGetParameter,
  &tacSinkMonitorStateBlockAlgorithm,
  NULL,
  NULL,
  NULL,
  &tacSinkMonitorStateBlockDestructor,
  &tacSinkMonitorStateBlockShow,
};

/* Probe with a maximal sampling rate of 2kHz */
tacSTDBLOCK_TYPE tacSinkProbeTypeInfo = {
  "Probe",
  &tacSinkProbeBlockConstructor,
  &tacSinkProbeBlockGetParameter,
  &tacSinkProbeBlockAlgorithm,
  NULL,
  &tacSinkProbeBlockReset,
  NULL,
  &tacSinkProbeBlockDestructor,
  &tacSinkProbeBlockShow,
};

/* Fast probe is the same as probe but not does have the
 * limitation on the max. recording frequency */
tacSTDBLOCK_TYPE tacSinkFastProbeTypeInfo = {
  "FastProbe",
  &tacSinkFastProbeBlockConstructor,
  &tacSinkProbeBlockGetParameter,
  &tacSinkProbeBlockAlgorithm,
  NULL,
  &tacSinkProbeBlockReset,
  NULL,
  &tacSinkProbeBlockDestructor,
  &tacSinkProbeBlockShow,
};

STATUS tacSinkBlockInitAll (void)
{
  /* Install Sink Block Hooks */
  tacStdBlockInstallNewType(&tacSinkDisplayTypeInfo);
  tacStdBlockInstallNewType(&tacSinkMonitorTypeInfo);
  tacStdBlockInstallNewType(&tacSinkMonitorStateTypeInfo);
  tacStdBlockInstallNewType(&tacSinkProbeTypeInfo);
  tacStdBlockInstallNewType(&tacSinkFastProbeTypeInfo);

  /*
   * Perform other library initialization operations
   */

  return OK;
}

/*
 * Local functions
 */

/*
 * Note: Routines called from within the algorithm must be declared with
 *       static __inline__ <prototype>
 */

STATUS tacSinkCommonBlockConstructor (tacSTDBLOCK* pSelf,
                                      tacERROR* error)
{
  /* Allocate specific arrays (pointers are initialized to NULL by StdBlock) */
  if (tacStdBlockAllocateArrays(pSelf, 0, tacSINK_DEFAULT_INPUT_NUMBER, 
				tacSINK_DEFAULT_OUTPUT_NUMBER, error) == ERROR)
    {
      return ERROR;
    }

  if (tacStdBlockAllocateArrayBySize(pSelf, tacSTDBLOCK_PARAM_ARRAY, 
				     sizeof(tacSINK_SCOPE_LOCAL_PARAMETER), error) == ERROR)
      {
      tacStdBlockReleaseArray(pSelf, tacSTDBLOCK_INPUT_ARRAY, error);
      tacStdBlockReleaseArray(pSelf, tacSTDBLOCK_OUTPUT_ARRAY, error);
      return ERROR;
      }

  /* Specify the number of input signals to attach to become resolved */
  tacStdBlockSetInputNumber(pSelf, tacSINK_ONE_INPUT_NUMBER);    /* Required */

  return OK;
}

STATUS tacSinkSendMonitorMessage(MSG_Q_ID * msgq)
{
  /* Send message using provided function (different for Monitor and Probe blocks) */
    /* Send message using provided function (different for Monitor and Probe blocks) */
    tacSHMSG msg;

    if ( msgQReceive(*msgq, (char*) &msg, sizeof(tacSHMSG), NO_WAIT) ==  sizeof(tacSHMSG) )
	{
	tacShMsgQSendBgInfo (&msg);
	}

  return OK;
}

STATUS tacSinkSendProbeMessage(MSG_Q_ID * msgq)
{
    /* Send message using provided function (different for Monitor and Probe blocks) */
    tacSHMSG msg;

    if ( msgq == NULL ) 
	{
	return ERROR;
	}

    if ( msgQReceive(*msgq, (char*) &msg, sizeof(tacSHMSG), NO_WAIT) ==  sizeof(tacSHMSG) )
	{
	tacShMsgQSendDataSample (&msg);
	}
    return OK;
}

STATUS tacSinkDeleteMessageQueue (MSG_Q_ID * msgq)
{
    /* Is called when block is destroyed and removes the internal message queue */
    STATUS status;

    status = msgQDelete (*msgq);
    *msgq = NULL;
    return status;
}


/*___oOo___*/