GemFire In-Memory Map-Reduce with Java 8

When I started studying Gemfire, I decided I needed some hands-on experience with it. I enjoy learning by experience, thus I decided to share my own experience with you. Gemfire is an in-memory distributed database and it supports both relational-based store (Gemfire XD) and object store (key-value hash maps, Gemfire). On my first hands-on I started with the object store approach. Gemfire works great for data streams and it has a good distributed cache mechanism, which is very easy to use. I used vmware vFabric documentation and also this quickstart.

I started creating my IDE project. Using Maven, I only needed to add the repository on my pom file:

<repositories>
     <repository>
          <id>gemfire-repository</id>
          <name>Gemfire Repository</name>
          <url>http://dist.gemstone.com/maven/release</url>
     </repository>
 </repositories>
<dependencies>
     <dependency>
          <groupId>com.gemstone.gemfire</groupId>
          <artifactId>gemfire</artifactId>
          <version>7.0.2</version>
     </dependency>
</dependencies>

Then, Maven takes care of downloading necessary Jar libraries and etc. Creating a Gemfire cache is quite simple. First you create a cache:


import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;

public class TwitterCache {

private final Cache cache;
private final Region<Long, Status> tweets;
private final Region<Long, Status> newtweets;

{...}

public TwitterCache() {
    cache = new CacheFactory().create();
    tweets = cache.getRegion("tweets");
    newtweets = cache.getRegion("newtweets");
}

{...}

}

Then, you will be able to access the cache and its “regions” just as a typical java HashMap. To store a value, you use the put(key, value) method and, to get a value, you use the typicall get(key) method. You may use any class to be your keys and values, but remember that classes should implement equals() and hashcode() methods to be used as keys on maps.

Now let’s assume we have a Gemfire cache full of tweets (using the example above), and we want to do a classical word count (like the wordcount example in Hadoop). Basically, we want to check the Top 20 words of our data set (excluding stop words). However, you don’t want to use hadoop because your data is in-memory and you want don’t want to move it to HDFS. Besides, you want a close to real-time view of what is happening on twitter. Then, you decided that you need an in-memory MapReduce. That could be hard to program some years ago, but with new features on languages such as Scala or Java 8, you can do it very easily. For example, for our in-memory map-reduce for tweets, we just need this method:


public static Map<String, Integer> wordcount(Map<Long, String> tweets) {
     StopwordFilter stopwords = new StopwordFilter();
     Map<String, Integer> result = tweets.values().stream().parallel()
         //convert each tweet in the Map<> tweets -> an array of words
         .map((tweet) -> tweet.trim().split(" "))
         //each array s is converted -> stream
         .map((s) -> Arrays.stream(s).parallel()
         //for each word, removes digits and other non-text chars
         .map(word -> word.replaceAll("[^#@\\p{L}\\p{Nd}]", ""))
         .filter(stopwords)
         //collect the words of the tweet and creates a map
         //map each word to it's counts
         .collect(Collectors
         .toMap(word -> word, word -> 1, (a, b) -> a + b)))
         //reduce all the resulting Maps into a single resulting map
         .reduce(new ReduceOperator()).get();

     return result;
}

Java 8 introduced a more functional-style programming. Thus, as you can see in the code above, we pass the stopwords object as a predicate to the filter method. The class StopwordFilter implements the Predicate<T> interface, which obligates the class to have the test(T value) method, that should return true whether the value attends a criteria. Check the class code below:


package com.tweet.cruncher;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.HashSet;
import java.util.Scanner;
import java.util.Set;
import java.util.function.Predicate;

/**
* Functional predicate to check if a given word is a stopword.
* @author Jonas Dias
*/
class StopwordFilter implements Predicate<String> {

    Set<String> stopwords;
    public static final String STOPWORDS_EN = "stop_en.txt";
    public static final String STOPWORDS_PT = "stop_pt.txt";

    public StopwordFilter() {
        stopwords = new HashSet<>();
        try {
            this.loadStopWords(STOPWORDS_EN);
            this.loadStopWords(STOPWORDS_PT);
        } catch (FileNotFoundException fileNotFoundException) {
            System.out.println("Error while loading stopwords file. "
            + "Please check if you have the "
            + "files " + STOPWORDS_EN + " and " + STOPWORDS_PT
            + " in your running directory.");
            System.out.println(fileNotFoundException.getMessage());
        }
    }

    @Override
    public boolean test(String t) {
        return !stopwords.contains(t.toLowerCase().trim()) && !t.isEmpty() && !t.startsWith("http");
    }

    private void loadStopWords(String filename) throws FileNotFoundException {
        Scanner s = new Scanner(new File(filename));
        while(s.hasNextLine()) {
            String stopword = s.nextLine().toLowerCase().trim();
            stopwords.add(stopword);
        }
    }
}

Gemfire keeps data in-memory, thus it may be quite important to persist data to disk, eventually. Thus, I also wrote a simple class to lazy-write data to HAWQ. I use a PostgreSQL JDBC driver to connect with HAWQ and to perform inserts on the database periodically. Besides, a controller class manage the threads that collect data from twitter using Twitter4j. I am not showing the classes here, but you can find them in the zip file attached.

I found this example worth sharing because it shows how easy you can develop your Gemfire application. In order to run the Java app with the Gemfire cache, it is important to configure your cache. You can do this by placing a XML file on the same directory where you are running your Jar file. Here is the XML I have used.

<?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE cache PUBLIC "-//GemStone Systems, Inc.//GemFire Declarative Caching 7.0//EN" "http://www.gemstone.com/dtd/cache7_0.dtd">
<cache>
     <cache-server port="40404" />
     <region name="tweets">
         <region-attributes refid="REPLICATE" /> <!-- client region shortcut -->
     </region>
     <region name="newtweets">
         <region-attributes refid="REPLICATE" /> <!-- client region shortcut -->
     </region>
     <region name="wordcounts">
         <region-attributes refid="REPLICATE" /> <!-- client region shortcut -->
     </region>
</cache>

After running the cache for several minutes, you get several tweets on your cache. Besides you have the results for the wordcount. However, you need someway to visualize them. You may build a client app to access Gemfire cache and read the cache. I will leave the details for a next post! 🙂

You can also download Gemfire from Pivotal network and try this tutorial. The tutorial is good because it gives a good overview of the different types of caches and also gives an overview of the gfsh utility. More interested readers may also like this post from @william_markito.

The maven project with the code I used is available here.

2 thoughts on “GemFire In-Memory Map-Reduce with Java 8

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s