Oracle Fusion Middleware – “How to Query Oracle Coherence” ?

In my previous post “Oracle Fusion Middleware – Concepts of Oracle Coherence” i have explained how to preLoad cache which means get all the required details for the first time and put that into cache. Today i am going to explain how to query the cache once we have the details.

Querying in Oracle Coherence is obtained by set of Filters that the Coherence has provided us. Please be noted that queries apply only to currently cached data thus, the data set should be loaded entirely into cache before queries are performed(go through my preLoading concept in the previous post).

The concept of querying is based on the ValueExtractor interface. A value extractor is used to extract an attribute from a given object for querying Most developers need only the ReflectionExtractor implementation of this interface. The implementation uses reflection to extract an attribute from a value object by referring to a method name which is typically a getter method

Let me show you the example which is connected to my previous post:

/**
 * 
 */
package com.spark.coherence.cache.filters;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import com.spark.coherence.cache.repository.CacheRepository;
import com.spark.coherence.pof.beans.EmployeeBean;
import com.tangosol.util.Filter;
import com.tangosol.util.ValueExtractor;
import com.tangosol.util.extractor.ReflectionExtractor;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.filter.GreaterEqualsFilter;
import com.tangosol.util.filter.GreaterFilter;

/**
 * @author Sony
 *
 */
public class EmployeeFilter {

	ValueExtractor valueExtractor;

	@SuppressWarnings("rawtypes")
	public EmployeeBean equalsFilter(String methodName, String methodParam) {
		valueExtractor = new ReflectionExtractor(methodName);
		Filter filter = new EqualsFilter(valueExtractor, methodParam);
		EmployeeBean person = null;
		for (Iterator iter = CacheRepository.getEmployeeCache().entrySet(filter).iterator(); iter.hasNext();) {
			Map.Entry entry = (Map.Entry) iter.next();
			Integer key = (Integer) entry.getKey();
			person = (EmployeeBean) entry.getValue();
		}
		return person;
	}
	
	@SuppressWarnings("rawtypes")
	public List<EmployeeBean> greaterThanFilter(String methodName, int methodParam) {
		valueExtractor = new ReflectionExtractor(methodName);
		Filter filter = new GreaterEqualsFilter(valueExtractor, methodParam);
		List<EmployeeBean> employeeBeans = new ArrayList<>();
		for (Iterator iter = CacheRepository.getEmployeeCache().entrySet(filter).iterator(); iter.hasNext();) {
			Map.Entry entry = (Map.Entry) iter.next();
			Integer key = (Integer) entry.getKey();
			EmployeeBean person = (EmployeeBean) entry.getValue();
			employeeBeans.add(person);
		}
		return employeeBeans;
	}

}

Happy Caching..Happy Coding 🙂

Oracle Fusion Middleware – “Concepts of Oracle Coherence”

Hi all, if you are dealing with large applications with loads of write and read operations then each call to database will cost you so much in the performance prospective. Thinking of the above point now a days cache driven applications are given more and more importance. In this tutorial i will taking about the “Oracle Coherence”.

Note : This article assume that you already have the coherence downloaded and you know how to run a cache-server.

These days its very common to pre-populate the cache before the application starts using the data. In this example i am going to pre-popuate the cache.

Oracle Coherence revolves around the concept of named caches. The first thing you need to do in your code when working with Coherence is to obtain a reference to a named cache you want to work with. In order to do this, you
need to use the CacheFactory class, which exposes the getCache method as one of its public members.

below is the project structure.
coh-proj-struct

first of all let us write the configurations.
Step 1: applicationContext.xml – this file has the spring capability to instantiate the beans. In this i have written the DataSource object instantiation.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

	<bean id="dataSource"
		class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
		<property name="url" value="jdbc:oracle:thin:@localhost:1521:xe" />
		<property name="username" value="scott" />
		<property name="password" value="tiger" />
	</bean>
</beans>

let us start configuring our custom cache by creating coherence-cache-config.xml

<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
	xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config http://xmlns.oracle.com/coherence/coherence-cache-config/1.0/coherence-cache-config.xsd">	<!-- The defaults element defines factory-wide default settings. -->

	<defaults>
		<serializer system-property="tangosol.coherence.serializer" />
		<socket-provider system-property="tangosol.coherence.socketprovider" />
	</defaults>

	<caching-scheme-mapping>
		<cache-mapping>
			<cache-name>dist-*</cache-name>
			<scheme-name>example-distributed</scheme-name>
			<init-params>
				<init-param>
					<param-name>back-size-limit</param-name>
					<param-value>8MB</param-value>
				</init-param>
			</init-params>
		</cache-mapping>

		<cache-mapping>
			<cache-name>*</cache-name>
			<scheme-name>example-distributed</scheme-name>
		</cache-mapping>
	</caching-scheme-mapping>

	<caching-schemes>
		<!-- Distributed caching scheme. -->
		<distributed-scheme>
			<scheme-name>example-distributed</scheme-name>
			<service-name>DistributedCache</service-name>
			<serializer>
				<instance>
					<class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
					<init-params>
						<init-param>
							<param-type>String</param-type>
							<param-value>custom-pof-config.xml</param-value>
						</init-param>
					</init-params>
				</instance>
			</serializer>
			<backing-map-scheme>
				<read-write-backing-map-scheme>
					<internal-cache-scheme>
						<local-scheme>
						</local-scheme>
					</internal-cache-scheme>
				</read-write-backing-map-scheme>
			</backing-map-scheme>
			<autostart>true</autostart>
		</distributed-scheme>
	</caching-schemes>
</cache-config>

for every bean that we write we need to add an entry in the below file custom-pof-config.xml

<?xml version="1.0"?>
<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
	xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config http://xmlns.oracle.com/coherence/coherence-pof-config/1.0/coherence-pof-config.xsd">
	<user-type-list>
		<!-- include all "standard" Coherence POF user types -->
		<include>coherence-pof-config.xml</include>
		<user-type>
			<type-id>1000</type-id>
			<class-name>com.spark.coherence.pof.beans.EmployeeBean</class-name>
		</user-type>
	</user-type-list>
</pof-config>

we need to override the tangosol-coherence-override.xml file with our config file entries as below.

<?xml version='1.0'?>
<coherence
	xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config http://xmlns.oracle.com/coherence/coherence-operational-config/1.2/coherence-operational-config.xsd">
	<cluster-config>
		<!-- <member-identity> <cluster-name>my-coherance-cluster</cluster-name> 
			</member-identity> -->

		<multicast-listener>
			<address>224.12.1.0</address>
			<port>12100</port>
			<time-to-live>60</time-to-live>
		</multicast-listener>
	</cluster-config>

	<configurable-cache-factory-config>
		<init-params>
			<init-param>
				<param-type>java.lang.String</param-type>
				<param-value system-property="tangosol.coherence.cacheconfig">
					coherence-cache-config.xml
				</param-value>
			</init-param>
		</init-params>
	</configurable-cache-factory-config>
</coherence>

Once done with our configuration file, let us start writing the jdbc connection factory class.

/**
 * 
 */
package com.spark.coherence.jdbc.connection.factory;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import javax.sql.DataSource;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @author Sony
 *
 */
public class CoherenceJdbcConnectionFactory {

	private static Future<ApplicationContext> future;
	private static ExecutorService executorService = Executors.newFixedThreadPool(1);
	
	static{
		future = executorService.submit(new Callable<ApplicationContext>() {
			@Override
			public ApplicationContext call() throws Exception {
				return new ClassPathXmlApplicationContext("applicationContext.xml");
			}
		});
	}
	
	/**
	 * @return java.sql.Connection
	 * @throws InterruptedException
	 * @throws ExecutionException
	 * @throws BeansException
	 * @throws SQLException
	 */
	public static Connection getJdbcConnection() throws InterruptedException, ExecutionException, BeansException, SQLException{
		ApplicationContext applicationContext = future.get();
		return ((DataSource)applicationContext.getBean("dataSource")).getConnection();
	}
	
	/**
	 * @return javax.sql.DataSource
	 * @throws InterruptedException
	 * @throws ExecutionException
	 * @throws BeansException
	 * @throws SQLException
	 */
	public static DataSource getJdbcDataSource() throws InterruptedException, ExecutionException, BeansException, SQLException{
		ApplicationContext applicationContext = future.get();
		return ((DataSource)applicationContext.getBean("dataSource"));
	}
	
	private static ExecutorService getCurrentExecutorService(){
		return executorService;
	}
	
	public static void shutDownCurrentExecutorService(){
		getCurrentExecutorService().shutdown();
	}
}

Now let us create pojo classes, there are two classes explained here one is Entity.java and other is EmployeeBean.java. The Entity.java is used as Key for the EmployeeBean objects

/**
 * 
 */
package com.spark.coherence.pof.beans;

import java.io.IOException;

import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;

/**
 * @author Sony
 * 
 */
public class Entity<T> implements PortableObject{

	public T key;

	/**
	 * @return the key
	 */
	public T getKey() {
		return key;
	}

	/**
	 * @param key
	 *            the key to set
	 */
	public void setKey(T key) {
		this.key = key;
	}

	@SuppressWarnings("unchecked")
	@Override
	public void readExternal(PofReader pofReader) throws IOException {
		this.key = (T) pofReader.readObject(0);
	}

	@Override
	public void writeExternal(PofWriter pofWriter) throws IOException {
		pofWriter.writeObject(0, key);
	}

}
/**
 * 
 */
package com.spark.coherence.pof.beans;

import java.io.IOException;
import java.util.Date;

import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;

/**
 * @author Sony
 * 
 */
public class EmployeeBean extends Entity<Integer> {

	private int employeeId;
	private String employeeName;
	private String job;
	private int managerId;
	private Date hireDate;
	private int salary;
	private int commission;
	private int deptNo;

	public EmployeeBean() {
		// TODO Auto-generated constructor stub
	}

	public EmployeeBean(int employeeId) {
		super.setKey(employeeId);
		this.employeeId = employeeId;
	}

	/**
	 * @return the employeeId
	 */
	public int getEmployeeId() {
		return employeeId;
	}

	/**
	 * @param employeeId
	 *            the employeeId to set
	 */
	public void setEmployeeId(int employeeId) {
		this.employeeId = employeeId;
	}

	/**
	 * @return the employeeName
	 */
	public String getEmployeeName() {
		return employeeName;
	}

	/**
	 * @param employeeName
	 *            the employeeName to set
	 */
	public void setEmployeeName(String employeeName) {
		this.employeeName = employeeName;
	}

	public String getJob() {
		return job;
	}

	public void setJob(String job) {
		this.job = job;
	}

	public int getManagerId() {
		return managerId;
	}

	public void setManagerId(int managerId) {
		this.managerId = managerId;
	}

	public Date getHireDate() {
		return hireDate;
	}

	public void setHireDate(Date hireDate) {
		this.hireDate = hireDate;
	}

	public int getSalary() {
		return salary;
	}

	public void setSalary(int salary) {
		this.salary = salary;
	}

	public int getCommission() {
		return commission;
	}

	public void setCommission(int commission) {
		this.commission = commission;
	}

	public int getDeptNo() {
		return deptNo;
	}

	public void setDeptNo(int deptNo) {
		this.deptNo = deptNo;
	}

	@Override
	public void readExternal(PofReader pofReader) throws IOException {
		this.employeeId = pofReader.readInt(0);
		this.employeeName = pofReader.readString(1);
		this.job = pofReader.readString(2);
		this.managerId = pofReader.readInt(3);
		this.hireDate = pofReader.readDate(4);
		this.salary = pofReader.readInt(5);
		this.commission = pofReader.readInt(6);
		this.deptNo = pofReader.readInt(7);
	}

	@Override
	public void writeExternal(PofWriter pofWriter) throws IOException {
		pofWriter.writeInt(0, employeeId);
		pofWriter.writeString(1, employeeName);
		pofWriter.writeString(2, job);
		pofWriter.writeInt(3, managerId);
		pofWriter.writeDate(4, hireDate);
		pofWriter.writeInt(5, salary);
		pofWriter.writeInt(6, commission);
		pofWriter.writeInt(7, deptNo);
	}

}

Let’s start with writing logic to fetch from Oracle database and put that into Map

/**
 * 
 */
package com.spark.coherence.jdbc.dao.impl;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ResultSetExtractor;

import com.spark.coherence.pof.beans.EmployeeBean;

/**
 * @author Sony
 *
 */
public class EmployeeDao {

	private JdbcTemplate jdbcTemplate;

	public EmployeeDao(DataSource dataSource) {
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}

	public Map<Integer, EmployeeBean> getAllEmployees() {
		String sql = "select * from emp";

		Map<Integer, EmployeeBean> map = this.jdbcTemplate.query(sql,
				new ResultSetExtractor<Map<Integer, EmployeeBean>>() {

					Map<Integer, EmployeeBean> map = new HashMap<>();

					public java.util.Map<Integer, EmployeeBean> extractData(
							ResultSet resultSet) throws SQLException,
							DataAccessException {
						
						while (resultSet.next()) {
							EmployeeBean employeeBean = new EmployeeBean(
									resultSet.getInt("empno"));
							employeeBean.setEmployeeName(resultSet
									.getString("ename"));
							employeeBean.setJob(resultSet.getString("job"));
							employeeBean.setCommission(resultSet.getInt("comm"));
							employeeBean.setDeptNo(resultSet.getInt("deptno"));
							employeeBean.setHireDate(resultSet
									.getDate("hiredate"));
							employeeBean.setManagerId(resultSet.getInt("mgr"));
							employeeBean.setSalary(resultSet.getInt("sal"));

							map.put(employeeBean.getKey(), employeeBean);
						}

						return map;

					};
				});

		return map;
	}
}

Let us create a CacheRepository class from which we get the NamedCache instance as shown below.

/**
 * 
 */
package com.spark.coherence.cache.repository;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;

/**
 * @author Sony
 *
 */
public class CacheRepository {

	public static NamedCache getEmployeeCache(){
		return CacheFactory.getCache("dist-employee-cache");
	}
}

Now its time for to call the service layer and then put the data to cache.

/**
 * 
 */
package com.spark.coherence.cache.service.impl;

import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.springframework.beans.BeansException;

import com.spark.coherence.cache.repository.CacheRepository;
import com.spark.coherence.jdbc.connection.factory.CoherenceJdbcConnectionFactory;
import com.spark.coherence.jdbc.dao.impl.EmployeeDao;
import com.spark.coherence.pof.beans.EmployeeBean;
import com.tangosol.net.NamedCache;


/**
 * @author Sony
 *
 */
public class CacheService {

	public void preLoadCache(){
		try {
			Map<Integer, EmployeeBean> map = new EmployeeDao(CoherenceJdbcConnectionFactory.getJdbcDataSource()).getAllEmployees();
			NamedCache namedCache = CacheRepository.getEmployeeCache();
			namedCache.putAll(map);
		} catch (BeansException | InterruptedException | ExecutionException
				| SQLException e) {
			e.printStackTrace();
		}
	}
}

That is it we have done with coding part its now time for us to test the application by running the main method.

/**
 * 
 */
package com.spark.main;

import java.util.ArrayList;
import java.util.Collection;

import com.spark.coherence.cache.repository.CacheRepository;
import com.spark.coherence.cache.service.impl.CacheService;
import com.spark.coherence.jdbc.connection.factory.CoherenceJdbcConnectionFactory;
import com.spark.coherence.pof.beans.EmployeeBean;
import com.tangosol.net.NamedCache;


/**
 * @author Sony
 *
 */
public class CoherenceTest {

	/**
	 * @param args
	 */
	@SuppressWarnings({ "unused", "unchecked" })
	public static void main(String[] args) {
		NamedCache namedCache = CacheRepository.getEmployeeCache();
		System.out.println(namedCache.getCacheService().getInfo().getServiceName());
		
		new CacheService().preLoadCache();
		EmployeeBean employeeBean = (EmployeeBean)namedCache.get(7782);
		System.out.println(employeeBean.getEmployeeName());

		Collection<Integer> ids = new ArrayList<>();
		ids.add(7654);
		ids.add(7698);
		
		Collection<EmployeeBean> employeeBeans = namedCache.getAll(ids).values();
		for (EmployeeBean employeeBean2 : employeeBeans) {
			System.out.println(employeeBean2.getEmployeeName());
		}
		
		CoherenceJdbcConnectionFactory.shutDownCurrentExecutorService();
	}

}

Please observer the main class carefully i have demonstrated the two public methods get(Object) and getAll(Collection). In my next post i will be demonstrating to use the CacheListeners to know if cache is update,deleted or inserted.

Happy Coherence and Happy Coding 🙂