sabato 5 maggio 2012

Multi thread java - synchronized and locks

An interesting sample about synchronization in multithread programming in Java.
In this sample you may also find other usefull arguments (Timers, JMX , Ejb lookup,  JMS connection and other stuff).
In this sample I have a class (XSPDataManager ) which holds a collection.
This class (and its collection) is shared between 2 process, the first one is a Task executed from a Timer and the second one is OperativeStatusBwXSP class listening on JMS broker  (exactly in onMessage  method).
Problems arise when both threads access to collection and iterate on it.
Sometimes a ConcurrentModificationException is throws because  it is not generally permissible for one thread to modify a Collection while another thread is iterating over it.
In first (wrong) example you see an incorrect solution: make Collection synchronized  and also declaring syncronized all public methods. The problems is not addressed because  OperativeStatusBwXSP and TimerTask class manage collection outside call to XSPDataManager class.
In second example a better choice: synchronization occurs on XSPDataManage instance itself, so (probably) collection is managed from one task a time.




 wrong sample 

package com.italtel.inem.inemif.jmx;


import static com.italtel.snodo.inv.util.DefaultValue.CM_ManagerService4RepositoryLocal;
import static com.italtel.snodo.inv.util.DefaultValue.CM_ManagerService4RepositoryRemote;

import com.italtel.snodo.inv.util.DefaultValue;
import com.italtel.snodo.inv.util.DefaultValue.STATUS.VALUE;
import com.italtel.snodo.inv.xml.notificationInventory.NotificationInventory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.jboss.annotation.ejb.Depends;
import org.jboss.annotation.ejb.Management;
import org.xml.sax.InputSource;
import it.oneans.common.utility.CommonPropertiesMgr;
import it.oneans.common.utility.CommonPropertiesConstants;
import com.italtel.inem.inemif.jmx.XSPDataManager.LOOKUP_MODE;
import com.italtel.inem.inemif.jmx.bw.BroadSoftWSTestConnectionClient;
import com.italtel.inem.inemif.jmx.interfaces.OperativeStatusBwXSPInterface;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import com.italtel.snodo.inv.ejb.dto.InterfaceCredentialsDTO;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationDTO;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationDTOBasic;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationFilterDTO;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationInterfaceDTO;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationPropertyDTO;
import com.italtel.snodo.inv.ejb.service.interfaces.ManagerRepository;
import com.italtel.snodo.inv.ejb.service.interfaces.ManagerRepositoryRemoteFull;
import static com.italtel.snodo.inv.util.DefaultValue.ADMIN_STATUS_ON;
import static com.italtel.snodo.inv.util.DefaultValue.ADMIN_STATUS_OFF;
import static com.italtel.snodo.inv.util.DefaultValue.PROPERTY_OPERATIVE_STATUS;
import static com.italtel.snodo.inv.util.DefaultValue.STATUS.VALUE.ON;

import javax.ejb.EJB;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
 





@org.jboss.annotation.ejb.Service(objectName = "it.oneans.iemx:service=OperativeStatusBwXSPMBeanwrong")
@Management(OperativeStatusBwXSPInterface.class)
@Depends( { "jboss.j2ee:jar=iNEM-Repository.jar,name=CMManagerServiceRepository,service=EJB3" })

/*
 * Classe esposta sia come EJB che come Bean JMX
 * Si occupa di testare lo stato operativo degli XSP di Broadsoft.
 * E' Facade di tutta la funzionalità (istanzia tutte classi dipendenti)
 * Utilizza classe BroadSoftWSTestConnectionClient per il test del web Service
 * Utilizza classe XSPDataManager (classe che mette dati in cache) per recupero dati da DB.
 * Si sottoscrive alle notifiche delle modifiche CM per vedere se qualche XSP è stato modificato e decide se
 * notificare alla classe XSPDataManager che la cache và aggiornata.
 * Se si riscontrano eccezioni del tipo java.util.ConcurrentModificationException  nella gestione della cache (dovuto a concorrenza tra il TimerTask e onMessage) si consiglia di disattivarla
 *
 */

public class OperativeStatusBwXSPwrong implements OperativeStatusBwXSPInterface , MessageListener{
   
   
    /**
     * Costante che identifica Type di tipo XSP. Da modificare se il campo Type definito su inemRepType.xml
     * dovesse cambiare.
     */
    public static final String XSP_TYPE = "XSP";
   
   
    /**
     * Logger interno della classe
     */
    private Logger logger = LogManager.getLogger(this.getClass());
   
   
   
    private LOOKUP_MODE ejbLookUPMode =  LOOKUP_MODE.LOCAL;

    /**
     * Variabile utilizzata per memorizzare stato di attivazione di Timer/Task di test di web services su XSP
     */
    private boolean isTimerActivated;
   
    ///// SEZIONE AMQ
    /**
     * url connessione a broker AMQ
     */
    String activeMqUrl;
   
    /**
     *
     *
Connessione alle code JMS.  non serializzabile
     */
    Connection jmsConnection;
   
    /*
     * Sessione jms
     */
    private Session session;
   
   
    /**
     * Serve a discriminare lo scenario di rilascio - questo MBean deve lavorare solo per scenario Telecom
     */
    private String scenario = CommonPropertiesConstants.DEFAULT_SCENARIO;
   
   
    /**
     * il timer per il task di check stato opeartivo BW
     */
    private Timer operativeStatusTimer;
   
    /**
     * il task di test vero e proprio
     */
    private OperativeStatusXSPTimerTask  operativeStatusTimerTask  ;
   
    /**
     * @see com.italtel.inem.inemif.jmx.XSPDataManager
     */
    private XSPDataManager xspDataManager ;
   
   
    /**
     * @see com.italtel.inem.inemif.jmx.bw.BroadSoftWSTestConnectionClient
     */
    private BroadSoftWSTestConnectionClient bwClient ;
   
    //Sezione x unmarshall messaggi notifica
    private JAXBContext jaxbContext;
   
    private Unmarshaller unmarshaller;


    private String remoteEjbIpAddress;
    /////////////////////////////
   
   
   
   
    /**
     * inizializzo solo oggetti serializzabili - qualunque inizializzazione viene effettuata solo se lo scenario è TI
     * NON TESTARE CON JUNIT  - prende parametri configurazione AMQ da common.properties.
     */
    public void create() throws Exception {
        logger.info("create() method Called on " + this.getClass());
        scenario = CommonPropertiesMgr.getCommonProperty(CommonPropertiesConstants.SCENARIO, CommonPropertiesConstants.DEFAULT_SCENARIO);
        if ( scenario.equals(CommonPropertiesConstants.DEFAULT_SCENARIO) ) {
            logger.info("scenario " + scenario);
            //Inizializzo la connectionFactory su base costante default (su localhost)
            activeMqUrl =  CommonPropertiesMgr.getCommonProperty(CommonPropertiesConstants.ACTIVE_MQ_CONNECTION_URL, CommonPropertiesConstants.DEFAULT_ACTIVE_MQ_CONNECTION_URL);
        }
    }

    /**
     * Metodo chiamato allo start del MBean. Istanzio tutti oggetti non serializzabili e aggetti dipendenti
     * Prerequisito : la start và chiamata dopo create opp dopo costruttore non standard (sotto).
     */
    public void start() throws Exception {
        logger.info("start() method Called on " + this.getClass());
        if (  scenario.equals(CommonPropertiesConstants.DEFAULT_SCENARIO) ) {
            // provo a collegarsi all'ejb per vedere se c'è un XSP in stato amm.vo on   
            xspDataManager = new XSPDataManager (ejbLookUPMode, this.remoteEjbIpAddress);
            bwClient = new  BroadSoftWSTestConnectionClient();   
            try {
                this.jaxbContext = JAXBContext.newInstance(NotificationInventory.class.getPackage().getName());
                unmarshaller = jaxbContext.createUnmarshaller();
            } catch (Exception e) {
                logger.error("Error: [" + e + "] creating the unmarshaller", e);
            }
            List<LogicalAggregationDTOBasic>   xspAdminON =  xspDataManager.getLAggrXSPAdminONList();
            if ( xspAdminON != null && xspAdminON.size() > 0){
                logger.debug("there is at least 1 xsp with admin status on");
                this.activateTimer();
            } else {
                logger.debug("no xsp with admin status on found ");
            }
            this.registerToAMQCMTopic();
        }
   
    }
   
    public void stop() {
        logger.info("stop() method Called on " + this.getClass());
        if (scenario.equals(CommonPropertiesConstants.DEFAULT_SCENARIO) ){
            this.deRegisterToAMQCMTopic();
            if (this.operativeStatusTimer != null && operativeStatusTimerTask != null) {
                deActivateTimer();
            }
        }
       
    }

    public void destroy() {
        logger.info("destroy() method Called on " + this.getClass());
       
    }

    /**
     * Implementazione interfaccia listening su JMS
     */
    @Override
    public void onMessage(Message cmNotificationMessage) {
        try {
            /**
            propEnum = cmNotificationMessage.getPropertyNames();
            while   ( propEnum.hasMoreElements() ) {
                Object  propName =  propEnum.nextElement();
               
                logger.debug(    propName);
                logger.debug(cmNotificationMessage.getStringProperty(propName.toString()));
            }
            */
            String messageText = ((TextMessage) cmNotificationMessage).getText();
            logger.info("Received a new message [" + messageText + "]");
            if  (   (cmNotificationMessage.getStringProperty("entity-type") != null ) &&
                    (cmNotificationMessage.getStringProperty("entity-type").equalsIgnoreCase("VIPBX")   ) &&  
                    (cmNotificationMessage instanceof TextMessage)  )
            {
                logger.info("Messaggio per VIPBX");
                List <LogicalAggregationDTOBasic> xspAdminStatusON = null;
                // verifico se attivare/disattivare timer in base agli xsp in stato amm.vo ON
                try {
                    //ATTENZIONE: BISOGNA NOTIFICARE IMMEDIATAMENTE AL DATAMANAGER CHE I DATI SONO CAMBIATI !!!!!
                    xspAdminStatusON = this.xspDataManager.notifyXspDataChangedOnDB();
                    //1th priority task - disable timers
                    if (      (  isTimerActivated  )    &&      
                        (   xspAdminStatusON == null  ||     xspAdminStatusON.size() == 0     )    )
                    {                                                                          
                        deActivateTimer();
                    }   
                } catch (IllegalStateException e) {
                    logger.error("Timer exception ", e);
                   
                } catch (Exception e) {
                    logger.error("ERROR RETRIEVING DATA FROM INEM_REPOSITORY", e);
                   
                }   
                //2th priority task - verifico se disattivare stato operativo dell'XSP dell'entity a di cui è arrivata notifica
                try {
                    NotificationInventory notification = (NotificationInventory) unmarshaller.unmarshal(new InputSource(new StringReader(messageText)));
                    String fullQualifiedName = notification.getEvent().getEntityId();
                    logger.debug("unmarshal message" + notification.getEvent().toString());
                    LogicalAggregationDTO vipbx = xspDataManager.getLogicalAggregationDTOByFullQualifiedName(fullQualifiedName);
                    List<? super LogicalAggregationDTOBasic>  childrenList  =   vipbx.getChildren();
                    for ( Object laggr   : childrenList ) {
                        if (laggr instanceof LogicalAggregationDTOBasic ) {
                            LogicalAggregationDTOBasic logicalAggregation = ((LogicalAggregationDTOBasic)laggr);
                            logger.debug("child " + logicalAggregation.getFullQualifiedName());
                            if (     ( logicalAggregation.getType().equals(XSP_TYPE) ) &&
                                ( logicalAggregation.getStatoAmministrativo().equals(ADMIN_STATUS_OFF) ) &&
                                ( logicalAggregation.getLogicalAggregationProperty(PROPERTY_OPERATIVE_STATUS).getValue().equals(ON.toString()) )
                            ) {
                                //Parte in questo caso una nuova notifica
                                logger.debug("xsp with admin OFF  - turn OFF operative status" + logicalAggregation.getFullQualifiedName());
                                xspDataManager.setXSPsOperativeStatus(logicalAggregation.getFullQualifiedName(), VALUE.OFF);
                                logger.debug("done");
                            }
                        }
                    }
                }catch (JAXBException jbe) {
                    logger.error("UNMARSHAL exception " , jbe);
                } catch (Exception  e ) {
                    logger.error("errore connessione inem_rep", e);
                }
                //3th task attivo timer solo al termine degli altri aggiornamenti
                if (    ( ! isTimerActivated  )        &&
                        (  xspAdminStatusON != null )  &&
                        ( xspAdminStatusON.size() > 0 )      
                    ){
                        activateTimer();
                    }
            } else {
                logger.debug("Messaggio scartato non inerente VIPBX");
            }
           
        } catch (JMSException e) {
            logger.error("JMS ERROR ON RETRIEVING MESSAGE" , e );
           
        }
    }
   
    /**
     * Costruttore utilizzabile per richiedere lookup remota degli ejb e delle code amq
     *
     * @param remoteHostIpAddress ip server remoto -
     */
    public OperativeStatusBwXSPwrong (String remoteHostIpAddress){
        this.ejbLookUPMode = LOOKUP_MODE.REMOTE;
        this.remoteEjbIpAddress = remoteHostIpAddress;
        this.activeMqUrl= "failover://tcp://" + remoteHostIpAddress +":61616";

       
    }
   
   
    /**
     * Metodo di registrazione alla topic amq
     *   
     */
    public  void registerToAMQCMTopic () throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(activeMqUrl);
        jmsConnection = connectionFactory.createConnection();
        session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination notificationTopic = session.createTopic("SYSTEM.INVENTORY.NOTIFICATION.JMSTOPIC");
        MessageConsumer consumer = session.createConsumer(notificationTopic, null);   
        consumer.setMessageListener(this);
        //Mi metto in ascolto sulla coda   
        jmsConnection.start();
        logger.info("TrapMqReceiver successfull started!");
       
    }

    /*
     * si deregistra da Topic AMQ
     * (non-Javadoc)
     * @see com.italtel.inem.inemif.jmx.interfaces.OperativeStatusBwXSPInterface#deRegisterToAMQCMTopic()
     */
    public  void deRegisterToAMQCMTopic ()   {
        try {
            if (this.session != null ) {
                session.close();
            }
            if (this.jmsConnection != null ) {
                jmsConnection.close();
            }
        } catch (JMSException jmsexc){
         logger.warn("Failure Deregistrering to AMQ", jmsexc);
        }
    }
   
   
    /*
     * Attiva timer di test dei ws su XSP Admin ON
     * (non-Javadoc)
     * @see com.italtel.inem.inemif.jmx.interfaces.OperativeStatusBwXSPInterface#activateTimer()
     */
    public  void activateTimer () {
        logger.info("Timer activating");
        operativeStatusTimer = new Timer(true);
        long testInterval = Long.valueOf(CommonPropertiesMgr.getCommonProperty(CommonPropertiesConstants.PROP_VPBX_XSP_OP_STATUS_CHECK_INTERVAL_MILLIS , CommonPropertiesConstants.DEFAULT_PROP_VPBX_XSP_OP_STATUS_CHECK_INTERVAL_MILLIS));    
        operativeStatusTimerTask = new OperativeStatusXSPTimerTask(testInterval, xspDataManager, bwClient);
        //WARNING - se il timer parte troppo presto rischio conflitti con la conclusione del metodo onMessage
        operativeStatusTimer.schedule(operativeStatusTimerTask, 10000, testInterval );
        this.isTimerActivated = true;
        logger.info("Timer activated");
    }
   

    /*
     * DISAttiva timer di test dei ws su XSP Admin ON
     * (non-Javadoc)
     * @see com.italtel.inem.inemif.jmx.interfaces.OperativeStatusBwXSPInterface#activateTimer()
     */
    public  void deActivateTimer () {
        logger.info("Timer deactivating");
        if (this.operativeStatusTimer != null && operativeStatusTimerTask != null) {
            operativeStatusTimerTask.cancel();
            operativeStatusTimer.cancel();
            operativeStatusTimer.purge();
        }
        this.isTimerActivated = false;
        logger.info("Timer deactivated");
    }

    @Override
    public void disableDataCache() {
        this.xspDataManager.disableCache();   
    }

    @Override
    public void enableDataCache() {
        this.xspDataManager.enableCache();   
       
    }


}



/**
 * Classe che rappresenta il task di test dello stato operativo del XSP di BroadWorks.
 * Gestisce la lista degli XSP sia verso il DB che propriamente i test dei web services
 * @author bacco
 *
 */
class OperativeStatusXSPTimerTaskWrong extends TimerTask {

    static final String OCI_CRED_TYPE = "OCI-P";
    static final String OCI_ProvUrl_ATTR = "ProvUrl";
    static final String OCI_TRANSPORT_ATTR = "Transfer Protocol";
   
   
    Logger logger = LogManager.getLogger(this.getClass());
   
    /*
     * costante che chi crea il Timer manda al task per dirgli qual'è il delay massimo da accettare tra la data schedulata e l'effettiva esecuzione.
     */
    private long maxDelay;
   
   
    private XSPDataManagerWrong dataManager;
   
   
    private BroadSoftWSTestConnectionClient bwClient;
   
   
    public OperativeStatusXSPTimerTaskWrong(long maxDelay, XSPDataManagerWrong dataManager, BroadSoftWSTestConnectionClient client ){
        this.maxDelay = maxDelay;
        this.dataManager = dataManager;
        this.bwClient = client;
    }
   

    private boolean isXSPReacheable(LogicalAggregationDTOBasic xspToTest) {
        List <LogicalAggregationInterfaceDTO>  lAggrInterfaceList = xspToTest.getLogicalAggregationMgmtInterfaceList();
        if ( lAggrInterfaceList == null || lAggrInterfaceList.size() != 1 ) {
            logger.error("trovate interfacce non previste");
            throw new IllegalStateException("1 Interface Expected ");
        }
        LogicalAggregationInterfaceDTO lAggrInterfaceDTO =    lAggrInterfaceList.get(0);
        String ipAddress = lAggrInterfaceDTO.getIpAddress();
        InterfaceCredentialsDTO    ifCredential  = lAggrInterfaceDTO.getInterfaceCredentials(OCI_CRED_TYPE);
        if ( ifCredential == null ||     ifCredential.getPort() == null  || ifCredential.getInterfaceCredentialAttribute(OCI_ProvUrl_ATTR) == null || ifCredential.getInterfaceCredentialAttribute(OCI_TRANSPORT_ATTR) == null ) {
            logger.error("trovate interfacce credential non previste");
            throw new IllegalStateException("1 Interface credential Expected ");
        }
        String port = ifCredential.getPort();
        String oci_url   =   ifCredential.getInterfaceCredentialAttribute(OCI_ProvUrl_ATTR).getValue();
        String protocol  = ifCredential.getInterfaceCredentialAttribute(OCI_TRANSPORT_ATTR).getValue();   
        String fullXSPProvisioningUrl = protocol + "://" + ipAddress + ":" + port  + oci_url;
        return this.bwClient.isXSPWebServiceReachable(fullXSPProvisioningUrl);
    }
   
   
    @Override
    public void run() {
        if (System.currentTimeMillis() - scheduledExecutionTime() >=  maxDelay) {
            logger.warn("il task di test parte oltre il max delay impostato che è di  " + maxDelay/1000 + " secondi e quindi non verrà eseguito; si consiglia di allungare periodo ");
            return;
        }
        // Perform the task
        try {
            List <LogicalAggregationDTOBasic>  xspToTestList =  dataManager.getLAggrXSPAdminONList();
            for ( LogicalAggregationDTOBasic   xspToTest      : xspToTestList ) {
                if (isXSPReacheable(xspToTest) ) {
                    dataManager.setXSPsOperativeStatus(xspToTest.getFullQualifiedName() , VALUE.ON);
                } else  {
                    dataManager.setXSPsOperativeStatus(xspToTest.getFullQualifiedName() , VALUE.OFF);
                }               
            }
        } catch (Exception e) {
            logger.error("ERROR RETRIEVING DATA FROM INEM_REPOSITORY", e );
           
        }

    }
   

}

/**
 *
 * @author lettini-bacco
 * classe specializzata recupero dati su db capace di fare cache di dati
 */
class XSPDataManagerWrong {
   
    static enum LOOKUP_MODE {LOCAL , REMOTE}
   
    private LOOKUP_MODE lookupMode ;
   
    @EJB
    public  ManagerRepository managerRepositoryLocale;
   
    private  ManagerRepositoryRemoteFull   managerRepositoryRemote;
   
    private volatile boolean reloadData = true;
   
    private  boolean enableCache = true;
   
    /*
     * Logger interno della classe
     */
    Logger logger = LogManager.getLogger(this.getClass());
   
   
    /*
     * Lista degli XSP su cui è necessario testare lo stato operativo - da rendere thread safe tutti i metodi pubblici che accedono a questo dato
     *  (non è sufficinete uso di Collections.synchronizedList)
     */
    private List<LogicalAggregationDTOBasic> lAggrXSPAdminONList = Collections.synchronizedList(new ArrayList<LogicalAggregationDTOBasic>());


    private String ipAddress;

   
    public XSPDataManagerWrong(LOOKUP_MODE lookUpMode, String ipAddress) {
        this.lookupMode = lookUpMode;
        this.ipAddress = ipAddress;
    }


    public void disableCache() {
        this.enableCache = false;
       
    }


    public   void enableCache() {
        this.enableCache = true;
       
    }


    public  void lookupRepositoryLocale() throws Exception {
        InitialContext ctx = null;
        try {
            Properties properties = new Properties();
            properties.setProperty(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, "jnp://localhost:1099")); //mbean.getJnpUrl()
            properties.setProperty(Context.INITIAL_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"));
            properties.setProperty(Context.URL_PKG_PREFIXES, System.getProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"));
           
            ctx = new InitialContext(properties);
           
            logger.debug("ManagerRepository " + ctx);
            logger.debug("Lookup on Manager");
            this.managerRepositoryLocale = (ManagerRepository) ctx.lookup(CM_ManagerService4RepositoryLocal);
        } catch (NamingException e) {
            logger.debug("Lookup on Manager " ,  e);
            throw new Exception(e);
        } finally {
            if(ctx != null)
                ctx.close();
        }
    }
   
   
    public  void lookupRepositoryRemote() throws Exception {
        InitialContext ctx = null;
        try {
            Properties properties = new Properties();
            properties.setProperty(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, "jnp://" + ipAddress + ":1099")); //mbean.getJnpUrl()
            properties.setProperty(Context.INITIAL_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"));
            properties.setProperty(Context.URL_PKG_PREFIXES, System.getProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"));
            ctx = new InitialContext(properties);
            logger.debug("ManagerRepository " + ctx);
            logger.debug("Lookup on Manager");
            this.managerRepositoryRemote = (ManagerRepositoryRemoteFull) ctx.lookup(CM_ManagerService4RepositoryRemote);
        } catch (NamingException e) {
            logger.debug("Lookup on Manager ", e);
            throw new Exception(e);
        } finally {
            if(ctx != null)
                ctx.close();
        }
    }
   
   
    public LogicalAggregationDTO getLogicalAggregationDTOByFullQualifiedName (String fullQualifiedName ) throws Exception{
        LogicalAggregationDTO xspLAggrDTO = null;
        if (managerRepositoryLocale != null ) {
            xspLAggrDTO = managerRepositoryLocale.findAllLinkByAsFQName(fullQualifiedName);
        } else if (managerRepositoryRemote != null ) { 
            xspLAggrDTO = managerRepositoryRemote.findAllLinkByAsFQName(fullQualifiedName);
        }
        return xspLAggrDTO;
    }
   
    //riscontrati errori in caso metodo non sincronizzato (conflitto in caso di conflitto tra setAdminStatus nella parte iterazione sulla collection)
    public synchronized List<LogicalAggregationDTOBasic> notifyXspDataChangedOnDB() throws Exception {
        this.reloadData = true;
        return this.getLAggrXSPAdminONList();
    }
   
    /*
     * Metodo di recupero XSP con stato Amm.vo ON.
     */
    private  void retrieveXSPsAdminStatusON() throws Exception {
        logger.debug("retrieving data from DB");
        LogicalAggregationFilterDTO filterDTO;
        filterDTO = new LogicalAggregationFilterDTO();
        filterDTO.addType(OperativeStatusBwXSP.XSP_TYPE);
        filterDTO.addStatoAmministrativo(ADMIN_STATUS_ON);
        List<LogicalAggregationDTOBasic> lAggrLis = null;
        if (managerRepositoryLocale != null ) {
             lAggrLis = managerRepositoryLocale.findFullLogicalAggregations(filterDTO);
        } else if (managerRepositoryRemote != null ) { 
            lAggrLis = managerRepositoryRemote.findFullLogicalAggregations(filterDTO);
        }
        if((lAggrLis == null)||(lAggrLis.size() == 0 )) {
            logger.info("NO XSP Admin Status ON found");
           
        } else if (lAggrLis.size()== 1) {
            logger.warn("retrieveXSPsAdminStatusON(), find  one XSP");
        } else if  (lAggrLis.size() > 1){
            logger.info("retrieveXSPsAdminStatusON(), find more than one XSP");
        }
        this.lAggrXSPAdminONList.addAll(lAggrLis);
    }
   
       
    //riscontrati errori in caso metodo non sincronizzato (conflitto in caso di conflitto tra setAdminStatus nella parte iterazione sulla collection)
    public synchronized  List<LogicalAggregationDTOBasic> getLAggrXSPAdminONList() throws Exception  {
        if (  this.managerRepositoryLocale == null &&
              this.lookupMode.equals(LOOKUP_MODE.LOCAL)  ) {
            lookupRepositoryLocale(); 
        }
        if  (   this.managerRepositoryRemote == null   &&
                this.lookupMode.equals(LOOKUP_MODE.REMOTE)        ) {
            this.lookupRepositoryRemote();
        }
        if (this.reloadData  ) {
            logger.debug("Reload Data");
            this.lAggrXSPAdminONList.clear();
            this.retrieveXSPsAdminStatusON();
            if (this.enableCache){
                logger.debug("cache enabled - reloadData = false");
                this.reloadData = false;
            }
        }
        return this.lAggrXSPAdminONList;
    }
       
   
   
    /*
     * @fullQualifiedName il full qualified name dell'XSP da aggiornare
     * @operativeStatus il valore dello stato operativo
     * verifica se effettivamente lo stato và o meno aggiornato.
     */
    public synchronized void  setXSPsOperativeStatus(String fullQualifiedName, DefaultValue.STATUS.VALUE operativeStatusValue ) throws Exception {
        LogicalAggregationDTO xspLAggrDTO = null;
        xspLAggrDTO = this.getXSPFromList(fullQualifiedName);
        if (xspLAggrDTO == null) {
        logger.warn("XSP NOT found in cache");
            if (managerRepositoryLocale != null ) {
                xspLAggrDTO = managerRepositoryLocale.findAllLinkByAsFQName(fullQualifiedName);
            } else if (managerRepositoryRemote != null ) { 
                xspLAggrDTO = managerRepositoryRemote.findAllLinkByAsFQName(fullQualifiedName);
            }
        }
        LogicalAggregationPropertyDTO  operativeStatusPropertyDTO = xspLAggrDTO.getLogicalAggregationProperty(PROPERTY_OPERATIVE_STATUS);
        String operativeStatusPropertyValueFromDB = operativeStatusPropertyDTO.getValue();
        logger.debug("operativeStatusPropertyValueFromDB=" + operativeStatusPropertyValueFromDB + " operativeStatusValue.toString()=" + operativeStatusValue.toString() + " operativeStatusValue=" + operativeStatusValue  );
        if (! operativeStatusPropertyValueFromDB.equals(operativeStatusValue.toString() )) {
            logger.debug("operativeStatus  da aggiornare " );
            xspLAggrDTO.setLogicalAggregationProperty(operativeStatusPropertyDTO.getName(), operativeStatusValue.toString() ) ;
            //potrei settare il reloadData a true ma è la classe OperativeStatusBwXSP che ha il compito di farlo perchè
            //avrà la notifica di questa modifica dati.
            if (managerRepositoryLocale != null ) {
                 managerRepositoryLocale.updateAllLogicalAggregationStructure(xspLAggrDTO);
            } else if (managerRepositoryRemote != null ) { 
                managerRepositoryRemote.updateAllLogicalAggregationStructure(xspLAggrDTO);
            }
        }       
    }
   
   
    private LogicalAggregationDTO getXSPFromList (String fullQualifiedName){
        for (LogicalAggregationDTOBasic laggr :  this.lAggrXSPAdminONList) {
            if (laggr.getFullQualifiedName().equals(fullQualifiedName) ) {
                logger.debug("XSP found in cache");
                return (LogicalAggregationDTO)laggr;
            }
        }
        return null;
    }
}

 
   Here a plain solution 
 package com.italtel.inem.inemif.jmx;


import static com.italtel.snodo.inv.util.DefaultValue.CM_ManagerService4RepositoryLocal;
import static com.italtel.snodo.inv.util.DefaultValue.CM_ManagerService4RepositoryRemote;
import com.italtel.snodo.inv.util.DefaultValue;
import com.italtel.snodo.inv.util.DefaultValue.STATUS.VALUE;
import com.italtel.snodo.inv.xml.notificationInventory.NotificationInventory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.jboss.annotation.ejb.Depends;
import org.jboss.annotation.ejb.Management;
import org.xml.sax.InputSource;
import it.oneans.common.utility.CommonPropertiesMgr;
import it.oneans.common.utility.CommonPropertiesConstants;
import com.italtel.inem.inemif.jmx.XSPDataManager.LOOKUP_MODE;
import com.italtel.inem.inemif.jmx.bw.BroadSoftWSTestConnectionClient;
import com.italtel.inem.inemif.jmx.interfaces.OperativeStatusBwXSPInterface;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import com.italtel.snodo.inv.ejb.dto.InterfaceCredentialsDTO;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationDTO;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationDTOBasic;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationFilterDTO;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationInterfaceDTO;
import com.italtel.snodo.inv.ejb.dto.LogicalAggregationPropertyDTO;
import com.italtel.snodo.inv.ejb.service.interfaces.ManagerRepository;
import com.italtel.snodo.inv.ejb.service.interfaces.ManagerRepositoryRemoteFull;
import static com.italtel.snodo.inv.util.DefaultValue.ADMIN_STATUS_ON;
import static com.italtel.snodo.inv.util.DefaultValue.ADMIN_STATUS_OFF;
import static com.italtel.snodo.inv.util.DefaultValue.PROPERTY_OPERATIVE_STATUS;
import static com.italtel.snodo.inv.util.DefaultValue.STATUS.VALUE.ON;
import javax.ejb.EJB;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
 

@org.jboss.annotation.ejb.Service(objectName = "it.oneans.iemx:service=OperativeStatusBwXSPMBean")
@Management(OperativeStatusBwXSPInterface.class)
@Depends( { "jboss.j2ee:jar=iNEM-Repository.jar,name=CMManagerServiceRepository,service=EJB3" })

/*
 * Classe esposta sia come EJB che come Bean JMX
 * Si occupa di testare lo stato operativo degli XSP di Broadsoft.
 * E' Facade di tutta la funzionalità (istanzia tutte classi dipendenti)
 * Utilizza classe BroadSoftWSTestConnectionClient per il test del web Service
 * Utilizza classe XSPDataManager (classe che mette dati in cache) per recupero dati da DB.
 * Si sottoscrive alle notifiche delle modifiche CM per vedere se qualche XSP è stato modificato e decide se
 * notificare alla classe XSPDataManager che la cache và aggiornata.
 * Se si riscontrano eccezioni del tipo java.util.ConcurrentModificationException  nella gestione della cache (dovuto a concorrenza tra il TimerTask e onMessage) si consiglia di disattivarla
 *
 */

public class OperativeStatusBwXSP implements OperativeStatusBwXSPInterface , MessageListener{
  
  
    /**
     * Costante che identifica Type di tipo XSP. Da modificare se il campo Type definito su inemRepType.xml
     * dovesse cambiare.
     */
    public static final String XSP_TYPE = "XSP";
  
  
    /**
     * Logger interno della classe
     */
    private Logger logger = LogManager.getLogger(this.getClass());
  
  
  
    private LOOKUP_MODE ejbLookUPMode =  LOOKUP_MODE.LOCAL;

    /**
     * Variabile utilizzata per memorizzare stato di attivazione di Timer/Task di test di web services su XSP
     */
    private boolean isTimerActivated;
  
    ///// SEZIONE AMQ
    /**
     * url connessione a broker AMQ
     */
    String activeMqUrl;
  
    /**
     *
     * Connessione alle code JMS.  non serializzabile
     */
    Connection jmsConnection;
  
    /*
     * Sessione jms
     */
    private Session session;
  
  
    /**
     * Serve a discriminare lo scenario di rilascio - questo MBean deve lavorare solo per scenario Telecom
     */
    private String scenario = CommonPropertiesConstants.DEFAULT_SCENARIO;
  
  
    /**
     * il timer per il task di check stato opeartivo BW
     */
    private Timer operativeStatusTimer;
  
    /**
     * il task di test vero e proprio
     */
    private OperativeStatusXSPTimerTask  operativeStatusTimerTask  ;
  
    /**
     * @see com.italtel.inem.inemif.jmx.XSPDataManager
     */
    private XSPDataManager xspDataManager ;
  
  
    /**
     * @see com.italtel.inem.inemif.jmx.bw.BroadSoftWSTestConnectionClient
     */
    private BroadSoftWSTestConnectionClient bwClient ;
  
    //Sezione x unmarshall messaggi notifica
    private JAXBContext jaxbContext;
    
    private Unmarshaller unmarshaller;


    private String remoteEjbIpAddress;
    /////////////////////////////
      
  
  
    /**
     * inizializzo solo oggetti serializzabili - qualunque inizializzazione viene effettuata solo se lo scenario è TI
     * NON TESTARE CON JUNIT  - prende parametri configurazione AMQ da common.properties.
     */
    public void create() throws Exception {
        logger.info("create() method Called on " + this.getClass());
        scenario = CommonPropertiesMgr.getCommonProperty(CommonPropertiesConstants.SCENARIO, CommonPropertiesConstants.DEFAULT_SCENARIO);
        if ( scenario.equals(CommonPropertiesConstants.DEFAULT_SCENARIO) ) {
            logger.info("scenario " + scenario);
            //Inizializzo la connectionFactory su base costante default (su localhost)
            activeMqUrl =  CommonPropertiesMgr.getCommonProperty(CommonPropertiesConstants.ACTIVE_MQ_CONNECTION_URL, CommonPropertiesConstants.DEFAULT_ACTIVE_MQ_CONNECTION_URL);
        }
    }

    /**
     * Metodo chiamato allo start del MBean. Istanzio tutti oggetti non serializzabili e aggetti dipendenti
     * Prerequisito : la start và chiamata dopo create opp dopo costruttore non standard (sotto).
     */
    public void start() throws Exception {
        logger.info("start() method Called on " + this.getClass());
        if (  scenario.equals(CommonPropertiesConstants.DEFAULT_SCENARIO) ) {
            // provo a collegarsi all'ejb per vedere se c'è un XSP in stato amm.vo on    
            xspDataManager = new XSPDataManager (ejbLookUPMode, this.remoteEjbIpAddress);
            bwClient = new  BroadSoftWSTestConnectionClient();  
            try {
                this.jaxbContext = JAXBContext.newInstance(NotificationInventory.class.getPackage().getName());
                unmarshaller = jaxbContext.createUnmarshaller();
            } catch (Exception e) {
                logger.error("Error: [" + e + "] creating the unmarshaller", e);
            }
            List<LogicalAggregationDTOBasic>   xspAdminON =  xspDataManager.getLAggrXSPAdminONList();
            if ( xspAdminON != null && xspAdminON.size() > 0){
                logger.debug("there is at least 1 xsp with admin status on");
                this.activateTimer();
            } else {
                logger.debug("no xsp with admin status on found ");
            }
            this.registerToAMQCMTopic();
        }
  
    }
  
    public void stop() {
        logger.info("stop() method Called on " + this.getClass());
        if (scenario.equals(CommonPropertiesConstants.DEFAULT_SCENARIO) ){
            this.deRegisterToAMQCMTopic();
            if (this.operativeStatusTimer != null && operativeStatusTimerTask != null) {
                deActivateTimer();
            }
        }
      
    }

    public void destroy() {
        logger.info("destroy() method Called on " + this.getClass());
      
    }

    /**
     * Implementazione interfaccia listening su JMS
     */
    @Override
    public void onMessage(Message cmNotificationMessage) {
        try {
            /**
            propEnum = cmNotificationMessage.getPropertyNames();
            while   ( propEnum.hasMoreElements() ) {
                Object  propName =  propEnum.nextElement();
                
                logger.debug(    propName);
                logger.debug(cmNotificationMessage.getStringProperty(propName.toString()));
            }
            */
            String messageText = ((TextMessage) cmNotificationMessage).getText();
            logger.info("Received a new message [" + messageText + "]");
            if  (   (cmNotificationMessage.getStringProperty("entity-type") != null ) &&
                    (cmNotificationMessage.getStringProperty("entity-type").equalsIgnoreCase("VIPBX")   ) && 
                    (cmNotificationMessage instanceof TextMessage)  )
            {
                logger.info("Messaggio per VIPBX");
                List <LogicalAggregationDTOBasic> xspAdminStatusON = null;
                // verifico se attivare/disattivare timer in base agli xsp in stato amm.vo ON
                synchronized (xspDataManager){
  
                 try {
                        //ATTENZIONE: BISOGNA NOTIFICARE IMMEDIATAMENTE AL DATAMANAGER CHE I DATI SONO CAMBIATI !!!!!
                        xspAdminStatusON = this.xspDataManager.notifyXspDataChangedOnDB();
                        //1th priority task - disable timers
                        if (      (  isTimerActivated  )    &&     
                            (   xspAdminStatusON == null  ||     xspAdminStatusON.size() == 0     )    )
                        {                                                                           
                            deActivateTimer();
                        }  
                    } catch (IllegalStateException e) {
                        logger.error("Timer exception ", e);
                        
                    } catch (Exception e) {
                        logger.error("ERROR RETRIEVING DATA FROM INEM_REPOSITORY", e);
                        
                    }  
                    //2th priority task - verifico se disattivare stato operativo dell'XSP dell'entity a di cui è arrivata notifica
                    try {
                        NotificationInventory notification = (NotificationInventory) unmarshaller.unmarshal(new InputSource(new StringReader(messageText)));
                        String fullQualifiedName = notification.getEvent().getEntityId();
                        logger.debug("unmarshal message" + notification.getEvent().toString());
                        LogicalAggregationDTO vipbx = xspDataManager.getLogicalAggregationDTOByFullQualifiedName(fullQualifiedName);
                        List<? super LogicalAggregationDTOBasic>  childrenList  =   vipbx.getChildren();
                        for ( Object laggr   : childrenList ) {
                            if (laggr instanceof LogicalAggregationDTOBasic ) {
                                LogicalAggregationDTOBasic logicalAggregation = ((LogicalAggregationDTOBasic)laggr);
                                logger.debug("child " + logicalAggregation.getFullQualifiedName());
                                if (     ( logicalAggregation.getType().equals(XSP_TYPE) ) &&
                                    ( logicalAggregation.getStatoAmministrativo().equals(ADMIN_STATUS_OFF) ) &&
                                    ( logicalAggregation.getLogicalAggregationProperty(PROPERTY_OPERATIVE_STATUS).getValue().equals(ON.toString()) )
                                ) {
                                    //Parte in questo caso una nuova notifica
                                    logger.debug("xsp with admin OFF  - turn OFF operative status" + logicalAggregation.getFullQualifiedName());
                                    xspDataManager.setXSPsOperativeStatus(logicalAggregation.getFullQualifiedName(), VALUE.OFF);
                                    logger.debug("done");
                                }
                            }
                        }
                    } catch (JAXBException jbe) {
                        logger.error("UNMARSHAL exception " , jbe);
                    } catch (Exception  e ) {
                        logger.error("errore connessione inem_rep", e);
                    }
                }
                //3th task attivo timer solo al termine degli altri aggiornamenti
                if (( ! isTimerActivated  )        &&
                    (  xspAdminStatusON != null )  &&
                    ( xspAdminStatusON.size() > 0 )       
                ){
                    activateTimer();
                }
            } else {
                logger.debug("Messaggio scartato non inerente VIPBX");
            }
        } catch (JMSException e) {
            logger.error("JMS ERROR ON RETRIEVING MESSAGE" , e );
        }
    }
  
    /**
     * Costruttore utilizzabile per richiedere lookup remota degli ejb e delle code amq
     *
     * @param remoteHostIpAddress ip server remoto -
     */
     /**
    public OperativeStatusBwXSP (String remoteHostIpAddress){
        this.ejbLookUPMode = LOOKUP_MODE.REMOTE;
        this.remoteEjbIpAddress = remoteHostIpAddress;
        this.activeMqUrl= "failover://tcp://" + remoteHostIpAddress +":61616";
    }**/
    
  
    /**
     * Metodo di registrazione alla topic amq
     *    
     */
    public  void registerToAMQCMTopic () throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(activeMqUrl);
        jmsConnection = connectionFactory.createConnection();
        session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination notificationTopic = session.createTopic("SYSTEM.INVENTORY.NOTIFICATION.JMSTOPIC");
        MessageConsumer consumer = session.createConsumer(notificationTopic, null);  
        consumer.setMessageListener(this);
        //Mi metto in ascolto sulla coda  
        jmsConnection.start();
        logger.info("TrapMqReceiver successfull started!");
      
    }

    /*
     * si deregistra da Topic AMQ
     * (non-Javadoc)
     * @see com.italtel.inem.inemif.jmx.interfaces.OperativeStatusBwXSPInterface#deRegisterToAMQCMTopic()
     */
    public  void deRegisterToAMQCMTopic ()   {
        try {
            if (this.session != null ) {
                session.close();
            }
            if (this.jmsConnection != null ) {
                jmsConnection.close();
            }
        } catch (JMSException jmsexc){
         logger.warn("Failure Deregistrering to AMQ", jmsexc);
        }
    }
  
  
    /*
     * Attiva timer di test dei ws su XSP Admin ON
     * (non-Javadoc)
     * @see com.italtel.inem.inemif.jmx.interfaces.OperativeStatusBwXSPInterface#activateTimer()
     */
    public  void activateTimer () {
        logger.info("Timer activating");
        operativeStatusTimer = new Timer(true);
        long testInterval = Long.valueOf(CommonPropertiesMgr.getCommonProperty(CommonPropertiesConstants.PROP_VPBX_XSP_OP_STATUS_CHECK_INTERVAL_MILLIS , CommonPropertiesConstants.DEFAULT_PROP_VPBX_XSP_OP_STATUS_CHECK_INTERVAL_MILLIS));   
        operativeStatusTimerTask = new OperativeStatusXSPTimerTask(testInterval, xspDataManager, bwClient);
        //WARNING - se il timer parte troppo presto rischio conflitti con la conclusione del metodo onMessage
        operativeStatusTimer.schedule(operativeStatusTimerTask, 10000, testInterval );
        this.isTimerActivated = true;
        logger.info("Timer activated");
    }
  

    /*
     * DISAttiva timer di test dei ws su XSP Admin ON
     * (non-Javadoc)
     * @see com.italtel.inem.inemif.jmx.interfaces.OperativeStatusBwXSPInterface#activateTimer()
     */
    public  void deActivateTimer () {
        logger.info("Timer deactivating");
        if (this.operativeStatusTimer != null && operativeStatusTimerTask != null) {
            operativeStatusTimerTask.cancel();
            operativeStatusTimer.cancel();
            operativeStatusTimer.purge();
        }
        this.isTimerActivated = false;
        logger.info("Timer deactivated");
    }

    @Override
    public void disableDataCache() {
        this.xspDataManager.disableCache();  
    }

    @Override
    public void enableDataCache() {
        this.xspDataManager.enableCache();  
      
    }


}



/**
 * Classe che rappresenta il task di test dello stato operativo del XSP di BroadWorks.
 * Gestisce la lista degli XSP sia verso il DB che propriamente i test dei web services
 * @author bacco
 *
 */
class OperativeStatusXSPTimerTask extends TimerTask {

    static final String OCI_CRED_TYPE = "OCI-P";
    static final String OCI_ProvUrl_ATTR = "ProvUrl";
    static final String OCI_TRANSPORT_ATTR = "Transfer Protocol";
  
  
    Logger logger = LogManager.getLogger(this.getClass());
  
    /*
     * costante che chi crea il Timer manda al task per dirgli qual'è il delay massimo da accettare tra la data schedulata e l'effettiva esecuzione.
     */
    private long maxDelay;
  
  
    private XSPDataManager xspDataManager;
  
  
    private BroadSoftWSTestConnectionClient bwClient;
  
  
    public OperativeStatusXSPTimerTask(long maxDelay, XSPDataManager dataManager, BroadSoftWSTestConnectionClient client ){
        this.maxDelay = maxDelay;
        this.xspDataManager = dataManager;
        this.bwClient = client;
    }
  

    private boolean isXSPReacheable(LogicalAggregationDTOBasic xspToTest) {
        List <LogicalAggregationInterfaceDTO>  lAggrInterfaceList = xspToTest.getLogicalAggregationMgmtInterfaceList();
        if ( lAggrInterfaceList == null || lAggrInterfaceList.size() != 1 ) {
            logger.error("trovate interfacce non previste");
            throw new IllegalStateException("1 Interface Expected ");
        }
        LogicalAggregationInterfaceDTO lAggrInterfaceDTO =    lAggrInterfaceList.get(0);
        String ipAddress = lAggrInterfaceDTO.getIpAddress();
        InterfaceCredentialsDTO    ifCredential  = lAggrInterfaceDTO.getInterfaceCredentials(OCI_CRED_TYPE);
        if ( ifCredential == null ||     ifCredential.getPort() == null  || ifCredential.getInterfaceCredentialAttribute(OCI_ProvUrl_ATTR) == null || ifCredential.getInterfaceCredentialAttribute(OCI_TRANSPORT_ATTR) == null ) {
            logger.error("trovate interfacce credential non previste");
            throw new IllegalStateException("1 Interface credential Expected ");
        }
        String port = ifCredential.getPort();
        String oci_url   =   ifCredential.getInterfaceCredentialAttribute(OCI_ProvUrl_ATTR).getValue();
        String protocol  = ifCredential.getInterfaceCredentialAttribute(OCI_TRANSPORT_ATTR).getValue();  
        String fullXSPProvisioningUrl = protocol + "://" + ipAddress + ":" + port  + oci_url;
        return this.bwClient.isXSPWebServiceReachable(fullXSPProvisioningUrl);
    }
  
  
    @Override
    public void run() {
        if (System.currentTimeMillis() - scheduledExecutionTime() >=  maxDelay) {
            logger.warn("il task di test parte oltre il max delay impostato che è di  " + maxDelay/1000 + " secondi e quindi non verrà eseguito; si consiglia di allungare periodo ");
            return;
        }
        // Perform the task
        synchronized (xspDataManager){
            try {
                List <LogicalAggregationDTOBasic>  xspToTestList =  xspDataManager.getLAggrXSPAdminONList();
                for ( LogicalAggregationDTOBasic   xspToTest      : xspToTestList ) {
                    if (isXSPReacheable(xspToTest) ) {
                        xspDataManager.setXSPsOperativeStatus(xspToTest.getFullQualifiedName() , VALUE.ON);
                    } else  {
                        xspDataManager.setXSPsOperativeStatus(xspToTest.getFullQualifiedName() , VALUE.OFF);
                    }              
                }
            } catch (Exception e) {
                logger.error("ERROR RETRIEVING DATA FROM INEM_REPOSITORY", e );
            }
        }

    }
  

}

/**
 *
 * @author lettini-bacco
 * classe specializzata recupero dati su db capace di fare cache di dati
 */
class XSPDataManager {
  
    static enum LOOKUP_MODE {LOCAL , REMOTE}
  
    private LOOKUP_MODE lookupMode ;
  
    @EJB
    public  ManagerRepository managerRepositoryLocale;
  
    private  ManagerRepositoryRemoteFull   managerRepositoryRemote;
  
    private volatile boolean reloadData = true;
  
    private  boolean enableCache = true;
  
    /*
     * Logger interno della classe
     */
    Logger logger = LogManager.getLogger(this.getClass());
  
  
    /*
     * Lista degli XSP su cui è necessario testare lo stato operativo - da rendere thread safe tutti i metodi pubblici che accedono a questo dato
     *  (non è sufficinete uso di Collections.synchronizedList)
     */
    //private List<LogicalAggregationDTOBasic> lAggrXSPAdminONList = Collections.synchronizedList(new ArrayList<LogicalAggregationDTOBasic>());
    private List<LogicalAggregationDTOBasic> lAggrXSPAdminONList = new ArrayList<LogicalAggregationDTOBasic>();


    private String ipAddress;

  
    public XSPDataManager(LOOKUP_MODE lookUpMode, String ipAddress) {
        this.lookupMode = lookUpMode;
        this.ipAddress = ipAddress;
    }


    public void disableCache() {
        this.enableCache = false;
      
    }


    public   void enableCache() {
        this.enableCache = true;
      
    }


    public  void lookupRepositoryLocale() throws Exception {
        InitialContext ctx = null;
        try {
            Properties properties = new Properties();
            properties.setProperty(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, "jnp://localhost:1099")); //mbean.getJnpUrl()
            properties.setProperty(Context.INITIAL_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"));
            properties.setProperty(Context.URL_PKG_PREFIXES, System.getProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"));
          
            ctx = new InitialContext(properties);
          
            logger.debug("ManagerRepository " + ctx);
            logger.debug("Lookup on Manager");
            this.managerRepositoryLocale = (ManagerRepository) ctx.lookup(CM_ManagerService4RepositoryLocal);
        } catch (NamingException e) {
            logger.debug("Lookup on Manager " ,  e);
            throw new Exception(e);
        } finally {
            if(ctx != null)
                ctx.close();
        }
    }
  
  
    public  void lookupRepositoryRemote() throws Exception {
        InitialContext ctx = null;
        try {
            Properties properties = new Properties();
            properties.setProperty(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, "jnp://" + ipAddress + ":1099")); //mbean.getJnpUrl()
            properties.setProperty(Context.INITIAL_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"));
            properties.setProperty(Context.URL_PKG_PREFIXES, System.getProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"));
            ctx = new InitialContext(properties);
            logger.debug("ManagerRepository " + ctx);
            logger.debug("Lookup on Manager");
            this.managerRepositoryRemote = (ManagerRepositoryRemoteFull) ctx.lookup(CM_ManagerService4RepositoryRemote);
        } catch (NamingException e) {
            logger.debug("Lookup on Manager ", e);
            throw new Exception(e);
        } finally {
            if(ctx != null)
                ctx.close();
        }
    }
  
  
    public LogicalAggregationDTO getLogicalAggregationDTOByFullQualifiedName (String fullQualifiedName ) throws Exception{
        LogicalAggregationDTO xspLAggrDTO = null;
        if (managerRepositoryLocale != null ) {
            xspLAggrDTO = managerRepositoryLocale.findAllLinkByAsFQName(fullQualifiedName);
        } else if (managerRepositoryRemote != null ) {
            xspLAggrDTO = managerRepositoryRemote.findAllLinkByAsFQName(fullQualifiedName);
        }
        return xspLAggrDTO;
    }
  
    //riscontrati errori anche in caso metodo non sincronizzato (conflitto in caso di conflitto tra setAdminStatus nella parte iterazione sulla collection)
    public List<LogicalAggregationDTOBasic> notifyXspDataChangedOnDB() throws Exception {
        this.reloadData = true;
        return this.getLAggrXSPAdminONList();
    }
  
    /*
     * Metodo di recupero XSP con stato Amm.vo ON.
     */
    private  void retrieveXSPsAdminStatusON() throws Exception {
        logger.debug("retrieving data from DB");
        LogicalAggregationFilterDTO filterDTO;
        filterDTO = new LogicalAggregationFilterDTO();
        filterDTO.addType(OperativeStatusBwXSP.XSP_TYPE);
        filterDTO.addStatoAmministrativo(ADMIN_STATUS_ON);
        List<LogicalAggregationDTOBasic> lAggrLis = null;
        if (managerRepositoryLocale != null ) {
             lAggrLis = managerRepositoryLocale.findFullLogicalAggregations(filterDTO);
        } else if (managerRepositoryRemote != null ) {
            lAggrLis = managerRepositoryRemote.findFullLogicalAggregations(filterDTO);
        }
        if((lAggrLis == null)||(lAggrLis.size() == 0 )) {
            logger.info("NO XSP Admin Status ON found");
          
        } else if (lAggrLis.size()== 1) {
            logger.warn("retrieveXSPsAdminStatusON(), find  one XSP");
        } else if  (lAggrLis.size() > 1){
            logger.info("retrieveXSPsAdminStatusON(), find more than one XSP");
        }
        this.lAggrXSPAdminONList.addAll(lAggrLis);
    }
  
      
    //riscontrati errori anche in caso metodo non sincronizzato (conflitto in caso di conflitto tra setAdminStatus nella parte iterazione sulla collection)
    public synchronized  List<LogicalAggregationDTOBasic> getLAggrXSPAdminONList() throws Exception  {
        if (  this.managerRepositoryLocale == null &&
              this.lookupMode.equals(LOOKUP_MODE.LOCAL)  ) {
            lookupRepositoryLocale();
        }
        if  (   this.managerRepositoryRemote == null   &&
                this.lookupMode.equals(LOOKUP_MODE.REMOTE)        ) {
            this.lookupRepositoryRemote();
        }
        if (this.reloadData  ) {
            logger.debug("Reload Data");
            this.lAggrXSPAdminONList.clear();
            this.retrieveXSPsAdminStatusON();
            if (this.enableCache){
                logger.debug("cache enabled - reloadData = false");
                this.reloadData = false;
            }
        }
        return this.lAggrXSPAdminONList;
    }
      
  
  
    /*
     * @fullQualifiedName il full qualified name dell'XSP da aggiornare
     * @operativeStatus il valore dello stato operativo
     * verifica se effettivamente lo stato và o meno aggiornato.
     * problemi anche se synchronized
     */
    public void  setXSPsOperativeStatus(String fullQualifiedName, DefaultValue.STATUS.VALUE operativeStatusValue ) throws Exception {
        LogicalAggregationDTO xspLAggrDTO = null;
        xspLAggrDTO = this.getXSPFromList(fullQualifiedName);
        if (xspLAggrDTO == null) {
        logger.warn("XSP NOT found in cache");
            if (managerRepositoryLocale != null ) {
                xspLAggrDTO = managerRepositoryLocale.findAllLinkByAsFQName(fullQualifiedName);
            } else if (managerRepositoryRemote != null ) {
                xspLAggrDTO = managerRepositoryRemote.findAllLinkByAsFQName(fullQualifiedName);
            }
        }
        LogicalAggregationPropertyDTO  operativeStatusPropertyDTO = xspLAggrDTO.getLogicalAggregationProperty(PROPERTY_OPERATIVE_STATUS);
        String operativeStatusPropertyValueFromDB = operativeStatusPropertyDTO.getValue();
        logger.debug("operativeStatusPropertyValueFromDB=" + operativeStatusPropertyValueFromDB + " operativeStatusValue.toString()=" + operativeStatusValue.toString() + " operativeStatusValue=" + operativeStatusValue  );
        if (! operativeStatusPropertyValueFromDB.equals(operativeStatusValue.toString() )) {
            logger.debug("operativeStatus  da aggiornare " );
            xspLAggrDTO.setLogicalAggregationProperty(operativeStatusPropertyDTO.getName(), operativeStatusValue.toString() ) ;
            //potrei settare il reloadData a true ma è la classe OperativeStatusBwXSP che ha il compito di farlo perchè
            //avrà la notifica di questa modifica dati.
            if (managerRepositoryLocale != null ) {
                 managerRepositoryLocale.updateAllLogicalAggregationStructure(xspLAggrDTO);
            } else if (managerRepositoryRemote != null ) {
                managerRepositoryRemote.updateAllLogicalAggregationStructure(xspLAggrDTO);
            }
            this.reloadData = true;
        }      
    }
  
  
    private LogicalAggregationDTO getXSPFromList (String fullQualifiedName){
        for (LogicalAggregationDTOBasic laggr :  this.lAggrXSPAdminONList) {
            if (laggr.getFullQualifiedName().equals(fullQualifiedName) ) {
                logger.debug("XSP found in cache");
                return (LogicalAggregationDTO)laggr;
            }
        }
        return null;
    }

  
}

 
  



Nessun commento:

Posta un commento