2014-03-03

Created page with "== Introduction == In a clustered configuration where Stardust is deployed on multiple nodes we run into the issue of model coherence. If a model is uploaded, deleted or versi..."

New page

== Introduction ==

In a clustered configuration where Stardust is deployed on multiple nodes we run into the issue of model coherence. If a model is uploaded, deleted or versioned on a particular node, the local engine caches on the other nodes do not automatically get updated. The behavior of the cluster thus becomes inconsistent and leads to problems during process execution. While it is possible to "reinit" the engine on each of the individual nodes using the "console" utility, it would be preferable to automate the process so that all the engine caches are automatically flushed when a model change is made on any node. This can be achieved by configuring the Stardust engine to use Hazelcast to broadcast and listen to model changes. This technique is described below.

== Hazelcast configuration ==

The following changes need to be added to server side carnot.properties file:

<source lang="text">

Carnot.Engine.Hazelcast.JcaConnectionFactoryProvider = org.eclipse.stardust.engine.spring.integration.jca.

SpringAppContextHazelcastJcaConnectionFactoryProvider

Infinity.Engine.Caching = true

</source>

Please refer to the Stardust documentation available [https://infinity.sungard.com/documentation/stardust/1.0/topic/org.eclipse.stardust.docs.deployment/html/applicationserversetup/hazel-spring.html here]for information on how to configure Hazelcast with Stardust.

== IPartitionMonitor implementation ==

Once Hazelcast has been configured properly we only need to add a custom IPartitionMonitor SPI implementation and plug that into our web application. This implementation is responsible for broadcasting model changes across the cluster and forcing the individual nodes to flush their caches. Model coherence is thus maintained. The ModelWatcher class shown below provides an implementation of the IPartitionMonitor SPI.

<source lang="java">

package com.infinity.bpm.rt.integration.cache.hazelcast;

import java.util.HashMap;

import java.util.Map;

import org.eclipse.stardust.common.log.LogManager;

import org.eclipse.stardust.common.log.Logger;

import org.eclipse.stardust.engine.api.model.IModel;

import org.eclipse.stardust.engine.api.runtime.AdministrationService;

import org.eclipse.stardust.engine.api.runtime.DeploymentException;

import org.eclipse.stardust.engine.api.runtime.QueryService;

import org.eclipse.stardust.engine.api.runtime.ServiceFactory;

import org.eclipse.stardust.engine.api.web.ServiceFactoryLocator;

import org.eclipse.stardust.engine.core.runtime.beans.IUser;

import org.eclipse.stardust.engine.core.runtime.beans.IUserRealm;

import org.eclipse.stardust.engine.core.runtime.beans.QueryServiceImpl;

import org.eclipse.stardust.engine.core.runtime.beans.removethis.SecurityProperties;

import org.eclipse.stardust.engine.core.spi.monitoring.IPartitionMonitor;

import com.hazelcast.core.Hazelcast;

import com.hazelcast.core.ITopic;

import com.hazelcast.core.MessageListener;

@SuppressWarnings("unchecked")

public class ModelWatcher implements IPartitionMonitor, MessageListener {

private static final Logger logger = LogManager.getLogger(ModelWatcher.class);

private Integer globalState = 1;

private ITopic modelTopic;

private QueryService qService;

public ModelWatcher() {

this.modelTopic = Hazelcast.getTopic("model");

this.modelTopic.addMessageListener(this);

logger.info("Member added to model change listener.");

this.qService = new QueryServiceImpl();

}

@Override

public void onMessage(Object msg) {

logger.info("Received model change notification.");

Map<String, String> properties = new HashMap<String, String>();

properties.put(SecurityProperties.PARTITION, "default");

ServiceFactory sf = ServiceFactoryLocator.get("motu","motu",properties);

AdministrationService aService = sf.getAdministrationService();

aService.flushCaches();

}

@Override

public void modelDeleted(IModel arg0) throws DeploymentException {

this.modelTopic.publish(this.globalState);

}

@Override

public void modelDeployed(IModel arg0, boolean arg1)

throws DeploymentException {

this.modelTopic.publish(this.globalState);

}

@Override

public void userCreated(IUser arg0) {

// TODO Auto-generated method stub

}

@Override

public void userDisabled(IUser arg0) {

// TODO Auto-generated method stub

}

@Override

public void userEnabled(IUser arg0) {

// TODO Auto-generated method stub

}

@Override

public void userRealmCreated(IUserRealm arg0) {

// TODO Auto-generated method stub

}

@Override

public void userRealmDropped(IUserRealm arg0) {

// TODO Auto-generated method stub

}

}

</source>

The implementation is made available to the Stardust engine using the standard ServiceLoader technique provided by Java - by creating a file with the FQN of the SPI interface i.e. "org.eclipse.stardust.engine.core.spi.monitoring.IPartitionMonitor" and adding the full name of our class to this file viz. "com.infinity.bpm.rt.integration.cache.hazelcast.ModelWatcher". The contents of the file look as follows:

<source lang="text">

com.infinity.bpm.rt.integration.cache.hazelcast.ModelWatcher

</source>

We then drop it into a folder called META-INF/services under the classes directory of our web application on all nodes and restart the nodes.

Upon restart, a distributed Hazelcast cache spanning all nodes is constructed and model changes made on any node are broadcast across the cache. We are thus able to automate the process of model management across cluster nodes.

Show more