Blog

Creating a .NET-based Visual Monitoring System for Hadoop

Summary

Generic Hadoop doesn’t provide any out-of-the-box visual monitoring systems that report on the status of all the nodes in a Hadoop cluster. This JNBridge Lab demonstrates how to create a .NET-based monitoring application that utilizes an existing Microsoft Windows product to provide a snapshot of the entire Hadoop cluster in real time.

Download the source code for this lab here, get a PDF of this Lab here.

The image below shows a .GIF preview of the app in action.

visualizer_new

Introduction

One of the ideals of distributed computing is to have a cluster of machines that is utterly self-monitoring, self-healing, and self-sustaining. If something goes wrong, the system reports it, attempts to repair the problem, and, if nothing works, reports the problem to the administrator — all without causing any tasks to fail. Distributed systems like Hadoop are so popular in part because they approach these ideals to an extent that few systems have before. Hadoop in particular is expandable, redundant (albeit not in a terribly fine-grained manner), easy-to-use, and reliable. That said, it isn’t perfect, and any Hadoop administrator knows the importance of additional monitoring to maintaining a reliable cluster.

Part of that monitoring comes from Hadoop’s built-in webservers. These have direct access to the internals of the cluster and can tell you what jobs are running, what files are in the distributed system, and various other bits of important information, albeit in a somewhat obtuse spreadsheet format. A number of Hadoop packages also come with generic distributed system monitoring apps such as Ganglia, but these aren’t integrated with Hadoop itself. Finally, there are products like Apache Ambari from Hortonworks that do a great deal of visual monitoring, but are tied to particular companies’ versions of Hadoop. In this lab we will look at the basics of producing a custom app that is integrated into the fabric of your own Hadoop cluster. In particular, being JNBridge, we are interested in building a .NET-based monitoring app that interfaces over a TCP connection with Java-based Hadoop using JNBridgePro. To expedite the process of creating a GUI for our monitoring app, we will use Microsoft Visio to easily create a visual model of the Hadoop cluster. This way we can create a rudimentary monitoring app that works as a cluster visualizer as well.

The app that we’re aiming to create for this lab is fairly simple. It will present a graph-like view of the logical topology of the cluster where each worker node displays its status (OK or not OK), the amount of local HDFS space used up, and the portion of  Mappers and Reducers that are in use. We’re not looking for hard numbers here — that information is attainable through the webservers — rather, our goal is to create a schematic that can be used to quickly determine the status of various components of the cluster.

Before we begin, please bear in mind two things: 1. We are not proposing our solution or our code as an actual option for monitoring your Hadoop cluster. We are simply proposing certain tools that can be used in the production of your own monitoring app. 2. Our solution was produced for Hortonworks’ HDP 1.3 distribution of Hadoop 1.2.

Even in our limited testing we noticed a distinct lack of portability between different Hadoop distributions and versions — particularly where directory locations and shell-script specifications are concerned. Hopefully our explanations are clear enough that you can adjust to the needs of your own cluster, but that might not always be the case. We’re also going to assume a passing familiarity with Hadoop and Visio, since explaining either system and its internal logic in great detail would make this lab much longer than need be.

What You’ll Need

  1. Apache Hadoop (we used the Hortonworks distribution, though any will work with some effort)
  2. Visual Studio 2012
  3. Microsoft Visio 2013
  4. Visio 2013 SDK
  5. JNBridgePro 7.0

Digging into Hadoop

To begin, in order to get as complete information about the cluster as possible, we need to get hold of the NameNode and JobTracker objects — which manage the HDFS and MapReduce portions of Hadoop respectively — that are currently running on the cluster. This will expose the rich APIs of both the JobTracker and the NameNode as well the individual Nodes of the cluster. It’s these APIs that the JSP code uses to create the built-in webserver pages and provides more than enough information for our purposes.

However, accessing these objects directly is somewhat difficult. By and large, Hadoop is built so the end user can only interface with the cluster via particular sockets that only meter certain information about the cluster out and only allow certain information in. Thus getting direct access to and using the APIs of the running NameNode and JobTracker isn’t something that you’re supposed to be able to do. This is a sensible safety precaution, but it makes getting the kind of information required for a monitoring app somewhat complicated. Granted, there is the org.apache.hadoop.mapred.ClusterStatus class that passes status information over the network, but the information it provides isn’t enough to create a truly robust monitoring app. Our solution to this dilemma involves a lightweight hack of Hadoop itself. Don’t worry, you’re not going to need to recompile source code, but some knowledge of that source code and the shell scripts used to run it would be helpful.

Our goal is to wedge ourselves between the scripts that run Hadoop and the process of actually instancing the NameNode and JobTracker. In so doing, we can write a program that breaks through the walled garden and allows us to serve up those objects to the .NET side directly. Technically a similar process could be used to code a similar monitoring app in pure Java, but that’s not what we’re interested in here. If things still seem a little fuzzy, hopefully you’ll get a better idea of our solution as we explain it.

When the $HADOOP_INSTALL/hadoop/bin/hadoop script is called to start the NameNode and JobTracker, it simply runs NameNode.main() and JobTracker.main(). These main functions, in turn, call just a handful of lines of code to start the two master nodes. Note that this process is usually further obfuscated by a startup script such as start-all.sh or, in our case with Hortonworks, hadoop-daemon.sh, but they all ultimately call the same $HADOOP_INSTALL/hadoop/bin/hadoop script. In our solution, instead of having the script call NameNode.main() and JobTracker.main(), we instead call the main functions of our own wrapper classes that contain the code from the original main functions in addition to setting up the remote Java-side servers of JNBridgePro. These wrapper classes are then able to serve up the JobTracker and NameNode instances to our Windows-based monitoring app.

The JobTracker wrapper class looks like this:

import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobTracker;
import com.jnbridge.jnbcore.server.ServerException;

public class JnbpJobTrackerWrapper {

	private static JobTracker theJobTracker = null;

	public static void main(String[] args) {

		Properties props = new Properties();
		props.put("javaSide.serverType", "tcp");
		props.put("javaSide.port", "8085");
		try {
			com.jnbridge.jnbcore.JNBMain.start(props);
		} catch (ServerException e) {

			e.printStackTrace();
		}

		try {
			theJobTracker = JobTracker.startTracker(new JobConf());
			theJobTracker.offerService();
		} catch (Throwable e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public static JobTracker getJobTracker()
	{
		return theJobTracker;
	}
}

And the NameNode wrapper class looks like this:

import java.util.Properties;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import com.jnbridge.jnbcore.server.ServerException;

public class JnbpNameNodeWrapper {

	private static NameNode theNameNode = null;

	public static void main(String[] args) {

		Properties props = new Properties();
		props.put("javaSide.serverType", "tcp");
		props.put("javaSide.port", "8087");
		try {
			com.jnbridge.jnbcore.JNBMain.start(props);
		} catch (ServerException e) {

			e.printStackTrace();
		}

		try {
			theNameNode = NameNode.createNameNode(args, null);
			if (theNameNode != null)
			{
				theNameNode.join();
			}
		} catch (Throwable e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public static NameNode getNameNode()
	{
		return theNameNode;
	}
}

To have the $HADOOP_INSTALL/hadoop/bin/hadoop script call our classes instead, we alter the following lines of code:

elif [ "$COMMAND" = "jobtracker" ] ; then
  #CLASS=org.apache.hadoop.mapred.JobTracker
  CLASS=com.jnbridge.labs.visio.JnbpJobTrackerWrapper
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOBTRACKER_OPTS"

and

elif [ "$COMMAND" = "namenode" ] ; then
#CLASS=org.apache.hadoop.hdfs.server.namenode.NameNode
CLASS=com.jnbridge.labs.visio.JnbpNameNodeWrapper
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"

The replacement lines are right below the commented-out original lines.

In order to finish up the Java side of this solution, we need to add our wrapper classes as well as the JNBridge .jars to the Hadoop classpath. In our case that meant simply adding the wrapper class .jars along with jnbcore.jar and bcel-5.1-jnbridge.jar to the $HADOOP_INSTALL/hadoop/lib directory. Since the Hadoop startup scripts automatically include that directory as part of the Java classpath, we don’t need to do anything else. The startup scripts that came with the Hortonworks distribution work exactly as they did before, and the cluster starts up without a hitch. Only now, our master nodes are listening for method calls from the .NET side of our monitoring app.

Monitoring on the Windows Side

To begin building the Java proxies, we need to add the .jars that contain the appropriate classes to the JNBridge classpath. Below is a screenshot from the JNBridge Visual Studio plugin of the .jars we added to the classpath used to create the proxies. Note that this includes not just the requisite JNBridge .jars and the Hadoop .jars (scraped from the machines in our cluster), but also our wrapper class .jars as well (here called proto1.jar).

From here we need to actually pick those classes that need to be proxied so that they can be called within our C# code natively. The easiest way to do this is simply select the two wrapper classes (JnbpJobTrackerWrapper and JnbpNameNodeWrapper) in the left window of the JNBridge Visual Studio proxy tool and click the Add+ button. JNBridge will take care of the rest automatically.

Now we can build the monitoring app itself. When beginning your new project, make sure to add the correct references. You need to reference the correct JNBShare.dll for your version of .NET, the .dll created by the JNBridge proxy process you performed earlier, and the Microsoft.Office.Interop.Visio.dll for your version of Microsoft Visio. We used Visio 2013 for this project along with its SDK (which is a newer version than what came with Visual Studio 2012). Also, be sure to add the JNBridge license .dll to your classpath. Here’s what our references looked like (note that HadoopVis is the name we gave to our Java proxy .dll):

The overall flow of our code is fairly simple: get the JobTracker and NameNode objects, use them to build an internal representation of the cluster, draw that cluster in Visio, and update that drawing with the latest information about the cluster.

In order to get the JobTracker and NameNode objects, we need to connect to the wrapper objects running on our cluster. We do this as follows:

// Connect to the two Java sides
// The jobTracker side is automatically named "default"
JNBRemotingConfiguration.specifyRemotingConfiguration(JavaScheme.binary,
        "obiwan.local", 8085);
// We name the nameNode side "NameNode"
JavaSides.addJavaServer("NameNode", JavaScheme.binary, "obiwan.local", 8087);

Note that we are connecting to two Java sides here even though they are both located on the same physical machine (obiwan.local on our cluster). In JNBridgePro, the system needs to know which Java side to communicate with in order to function properly. If you have an object that exists on one remote JVM and you try to call one of its methods while your code is pointing at a different JVM, your program will crash. To manage this, use the JavaSides.setJavaServer() to point to the correct JVM. You’ll see this sprinkled throughout our code as we switch between pointing to the JobTracker and the NameNode JVMs.

Once we’ve connected to the Java side, we just need to get the objects and build our internal representation of the cluster. The overall program flow looks like this:

JobTracker jobtracker = JnbpJobTrackerWrapper.getJobTracker();
java.util.Collection temp = jobtracker.activeTaskTrackers();
java.lang.Object[] tts = temp.toArray();
JavaSides.setJavaServer("NameNode");
NameNode namenode = JnbpNameNodeWrapper.getNameNode();
HadoopTree.buildTree(jobtracker, "obiwan.local", "obiwan.local", tts,
        namenode.getDatanodeReport(FSConstants.DatanodeReportType.ALL));
// closedFlag is True if a user closes the Visio window in use.
while (!closedFlag)
{
    JavaSides.setJavaServer("NameNode");
    DatanodeInfo[] dnReport = namenode.getDatanodeReport(
            FSConstants.DatanodeReportType.ALL);
    JavaSides.setJavaServer("default");
    HadoopTree.updateTree(jobtracker.activeTaskTrackers().toArray(),
            jobtracker.blacklistedTaskTrackers().toArray(), dnReport);
    System.Threading.Thread.Sleep(3000);
}

The buildTree() and updateTree() methods build and update the internal representation of the cluster and hold it within the HadoopTree class. These methods also invoke the VisioDrawer class that, in turn, draws that internal representation to Visio. We’re not going to go into detail here about how the HadoopTree class builds our internal representation of the cluster. The simple tree-building algorithm we use isn’t terribly pertinent to our current discussion, but we encourage you to look at our code especially if you’re curious about what methods we use to extract information from the JobTracker and NameNode objects (though a few of those methods can be seen in the above code snippet). Keep in mind there are a number of ways to pull information about the cluster from these to objects and we encourage you to explore the published APIs to figure out how to get the information you want for your app. On a side note, the API for the NameNode isn’t currently published as part of the official Hadoop API, so you’ll have to go back to the source code to figure out what methods to call. The API for the NameNode is considerably different from that for the JobTracker too, so don’t expect similar functionality between the two.

Drawing Everything in Visio

Once we have an internal representation of the cluster, we need to draw it in Visio to complete our rudimentary monitoring/visualization app. We begin by opening a new instance of Visio, creating a new Document, and adding a new Page:

// Start application and open new document
VisioApp = new Application();
VisioApp.BeforeDocumentClose += new EApplication_BeforeDocumentCloseEventHandler(quit);
ActiveWindow = VisioApp.ActiveWindow;
ActiveDoc = VisioApp.Documents.Add("");
ActivePage = ActiveDoc.Pages.Add();
ActivePage.AutoSize = true;

We then open our custom network template (which we’ve included with the code for your use) and pull out all the Masters we need to draw our diagram of the Hadoop cluster:

// Open visual templates
networkTemplate = VisioApp.Documents.OpenEx(@"$Template_DirectoryNetTemplate.vstx",
        (short)VisOpenSaveArgs.visOpenHidden);
Document pcStencil = VisioApp.Documents["COMPME_M.VSSX"];
Document networkStencil = VisioApp.Documents["PERIME_M.VSSX"];
Shape PageInfo = ActivePage.PageSheet;
PageInfo.get_CellsSRC((short)VisSectionIndices.visSectionObject,
        (short)VisRowIndices.visRowPageLayout,
        (short)(VisCellIndices.visPLOPlaceStyle)).set_Result(VisUnitCodes.visPageUnits,
                (double)VisCellVals.visPLOPlaceTopToBottom);
PageInfo.get_CellsSRC((short)VisSectionIndices.visSectionObject,
        (short)VisRowIndices.visRowPageLayout,
        (short)(VisCellIndices.visPLORouteStyle)).set_Result(VisUnitCodes.visPageUnits,
                (double)VisCellVals.visLORouteFlowchartNS);
ActivePage.SetTheme("Whisp");

// Get all the master shapes
masterNode = pcStencil.Masters.get_ItemU("PC");
slaveNode = networkStencil.Masters.get_ItemU("Server");
rack = networkStencil.Masters.get_ItemU("Switch");
dynamicConnector = networkStencil.Masters.get_ItemU("Dynamic connector");

// Open data visualization template and shape
slaveBase = VisioApp.Documents.OpenEx(@"$Template_DirectoryDataGraphicTemplate.vsdx",
        (short)Microsoft.Office.Interop.Visio.VisOpenSaveArgs.visOpenHidden);
slaveDataMaster = slaveBase.Pages[1].Shapes[1].DataGraphic;

There are two important things in this snippet that don’t crop up in a lot of examples using Visio. First are these two statements:

PageInfo.get_CellsSRC((short)VisSectionIndices.visSectionObject,
        (short)VisRowIndices.visRowPageLayout,
        (short)(VisCellIndices.visPLOPlaceStyle)).set_Result(VisUnitCodes.visPageUnits,
                (double)VisCellVals.visPLOPlaceTopToBottom);
PageInfo.get_CellsSRC((short)VisSectionIndices.visSectionObject,
        (short)VisRowIndices.visRowPageLayout,
        (short)(VisCellIndices.visPLORouteStyle)).set_Result(VisUnitCodes.visPageUnits,
                (double)VisCellVals.visLORouteFlowchartNS);

These statements tell Visio how to lay out the diagram as it’s being drawn. The first statement tells Visio to create the drawing from top-to-bottom (with the master nodes on top and the slave nodes on the bottom) while the second tells Visio to arrange everything in a flowchart-style pattern (we found this to be the most logical view of the cluster). Logically what we’re doing is editing two values in the current Page’s Shapesheet that Visio refers to when making layout decisions for that Page.

The second thing we want to draw your attention to are these lines:

slaveBase = VisioApp.Documents.OpenEx(@"$Template_DirectoryDataGraphicTemplate.vsdx",
        (short)Microsoft.Office.Interop.Visio.VisOpenSaveArgs.visOpenHidden);
slaveDataMaster = slaveBase.Pages[1].Shapes[1].DataGraphic;

This code opens a prior Visio project (that we’ve also included with our code) where we’ve simply tied a series of DataGraphics to a single Shape. These DataGraphics can then be scraped from the old project and tied to Shapes in our new project. Our prefabricated DataGraphics are used to display information about individual nodes in the cluster including HDFS space, Mappers/Reducers in use, and overall status of the TaskTracker and DataNode. We have to create these DataGraphics ahead of time since they can’t be created programmatically.

We can then draw the cluster on the Page that we’ve created. Again, we are going to skip over this portion of the process since it is largely standard Visio code. The cluster representation is drawn mostly using the Page.DropConnected() method, and since we’ve already told Visio how to format the drawing, we don’t need to mess with its layout too much. All we have to do is call Page.Layout() once all the Shapes have been drawn to make sure everything is aligned correctly.

The last interesting bit we want to touch on is updating the drawing with the most recent data from the cluster. First we need to get the latest data from the cluster and update our internal representation of the cluster:

public static void updateTree(Object[] taskNodeInfo, Object[] deadTaskNodes,
        DatanodeInfo[] dataNodeInfo)
{
    JavaSides.setJavaServer("NameNode");
    foreach (DatanodeInfo dn in dataNodeInfo)
    {
        HadoopNode curNode;
        leaves.TryGetValue(dn.getHostName(), out curNode);
        if (dn.isDecommissioned())
        {
            curNode.dataActive = false;
        }
        else
        {
            curNode.setHDSpace(dn.getRemainingPercent());
            curNode.dataActive = true;
        }
    }
    JavaSides.setJavaServer("default");
    foreach (TaskTrackerStatus tt in taskNodeInfo)
    {
        HadoopNode curNode;
        leaves.TryGetValue(tt.getHost(), out curNode);
        curNode.setMapUse(tt.getMaxMapSlots(), tt.countOccupiedMapSlots());
        curNode.setReduceUse(tt.getMaxReduceSlots(), tt.countOccupiedReduceSlots());
        curNode.taskActive = true;
    }
    foreach (TaskTrackerStatus tt in deadTaskNodes)
    {
        HadoopNode curNode;
        leaves.TryGetValue(tt.getHost(), out curNode);
        curNode.taskActive = false;
    }
    VisioDrawer.updateData(leaves);
}

Once the data has been gathered, the Visio drawing is updated:

public static void updateData(Dictionary<string, HadoopNode> leaves)
{
    foreach (KeyValuePair<string, HadoopNode> l in leaves)
    {
        HadoopNode leaf = l.Value;
        Shape leafShape = leaf.getShape();
        // Update HDFS information
        if (leaf.dataActive)
        {
            leafShape.get_CellsSRC(243, 20, 0).set_Result(0, leaf.getHDSpace());
            leafShape.get_CellsSRC(243, 0, 0).set_Result(0, 1);
        }
        // If the DataNode has failed, turn the bottom checkmark to a red X
        else
        {
            leafShape.get_CellsSRC(243, 0, 0).set_Result(0, 0);
        }
        // Update mapred information
        if (leaf.taskActive)
        {
            leafShape.get_CellsSRC(243, 17, 0).set_Result(0, leaf.getMapUse());
            leafShape.get_CellsSRC(243, 18, 0).set_Result(0, leaf.getReduceUse());
            leafShape.get_CellsSRC(243, 12, 0).set_Result(0, 1);
        }
        // If the Tasktracker has failed, turn the bottom checkmark to a red X
        else
        {
            leafShape.get_CellsSRC(243, 12, 0).set_Result(0, 0);
        }
    }
}

Logically we are just changing certain values in each Shapes’ Shapesheet that are tied to the DataGraphics we added earlier. Which cells of the Shapesheet correspond to which DataGraphic had to be decided in advance when we created the DataGraphic by hand. This way we can address those indices directly in our code.

This updating process (as you saw in an earlier code segment) is done in a simple while loop polling system that updates every three seconds. We used this method rather than a callback/event handling strategy largely for ease of implementation. The NameNode and JobTracker classes don’t implement a listener interface for notifying when values change. As a result, in order to add this functionality, we would have to do significantly more Hadoop hacking than we’ve already done. We could also implement an asynchronous update system in pure C# that would use events to notify the graphic to update, but that would still require polling the Java side for changes somewhere within our program flow. Such a system would lighten the load on Visio by decreasing the number of times we draw to the Page, but wouldn’t increase efficiency overall. While both ways of implementing callbacks are interesting exercises, they’re somewhat outside the scope of this lab.

The Result

For our small, four-virtual-machine cluster, this is the result (as you saw above):

visualizer

Here a Map/Reduce job is running such that 100% of the Mappers are in use and none of the Reducers are being used yet. Also, notice that the middle worker node has used up almost all of its local HDFS space. That should probably be addressed.

For larger, enterprise-size clusters Visio will likely become an even less viable option for handling the visualization, but for our proof-of-concept purposes it works just fine. For larger clusters, building a visualizer with WPF would probably be the better answer for a .NET-based solution.

We hope this lab has been a springboard for your own ideas related to creating Hadoop monitoring/visualization applications.

~Ian Heinzman

Building an Excel add-in for HBase MapReduce

Summary

This latest project from JNBridge Labs investigates building an Excel add-in for Hadoop HBase. As a Java framework, HBase applications must use Java APIs, resulting in single-platform solutions. A cross-platform HBase integrated solution, particularly one that provides business intelligence on the desktop, like Microsoft Excel, is unable to leverage the HBase remote client API. This means using a lower level interoperability mechanism, like implementing a .NET Thrift client. The current project uses JNBridgePro for .NET-to-Java interoperability. It also leverages concepts and code from the previous lab, Building a LINQ provider for HBase MapReduce, which investigated a LINQ extension for HBase.  

Introduction

Hadoop allows businesses to quickly analyze very large data sets. Hadoop can reduce ludicrous amounts of data to a meaningful answer in a short amount of time, however, without understanding the shape of your data, you run the risk of garbage in, garbage out. Analysis itself is an iterative process relying on investigation. Tools that aid data investigation provide a means to quickly view, sort, filter/reduce and represent data, making it possible to quickly find and understand patterns, trends and relationships.

Microsoft Excel has always been the ubiquitous off-the-shelf  tool for data analysis and it makes a ready-to-go front end for Hadoop. Excel can be extended using add-ins developed in Visual Studio using VSTO, Visual Studio Tools for Office. This lab will explore a simple Excel front-end to HBase MapReduce. The front-end will allow a user to view HBase tables and execute MapReduce jobs. The goal is to make the add-in generic with respect to the column definitions and data in a HBase table.

Getting Started

The components required for this lab are identical to those required in the previous lab, Building a LINQ provider for HBase MapReduce. Here’s a quick list of the components.

  1. Apache Hadoop Stack (see the previous lab’s Getting Started section for more information)
  2. Visual Studio 2012
  3. Eclipse
  4. JNBridgePro 7.0
  5. Office Developer Tools for Visual Studio 2012 (this includes VSTO).
  6. Microsoft Office 2010

Calling Java from .NET: Creating proxies using JNBridgePro

Since the Excel add-in is written in C#/.NET and needs to call several Java class APIs, the first step is to use the JNBridgePro plug-in for Visual Studio to create an assembly of proxies that represent the Java API. When a proxy of a Java class is instantiated in .NET, the real Java object is instantiated in the Java Virtual Machine. The JNBridgePro run-time manages communications, i.e. invoking methods, and syncing garbage collection between the .NET CLR and the JVM.

For this development step, as well as during run-time, a bunch of Hadoop, HBase and ZooKeeper JAR files must be available on the Windows machine. These can be scraped from a machine running the Hadoop stack (look in /usr/lib/hadoop/lib/usr/lib/hbase/lib, etc.)

This is a screen shot of the Edit Class Path dialog for the JNBridgePro Visual Studio plug-in.

These are the JAR files required to create the .NET proxies. During run-time, three additional JAR files must be included in the JVM’s class path when initiating the bridge between the JVM and the CLR: avro-1.5.4.jarcommons-httpclient-3.1.jar and slf4j-nop-1.6.1.jar (the last JAR file inhibits logging by Hadoop and HBase).

Below, is a screen shot of the JNBridgePro proxy tool in Visual Studio. The left hand pane shows all the namespaces found in the JAR files shown in the above dialog. The required namespaces are org.apache.hadoop.hbase.client and org.apache.hadoop.hbase.filter. In addition, individual classes like org.apache.hadoop.hbase.HBaseConfiguration are required (see the link at the end of this blog to download the source).

 

By clicking on the Add+ button, the chosen classes, as well as every dependent class, will be found and displayed in the center pane. The right-hand pane displays the public members and methods of the Java HTable class. The last step is to build the proxy assembly, DotNetToJavaProxies.dll.

Creating and populating an HBase Table

It would be nice to have an HBase table loaded with data and provide an opportunity to test calling various HBase Java APIs from .NET. The simple data will consist of an IP address, like “88.240.129.183″ and the requested web page, for example “/zebra.html”. This lab will use the same table, access_logs, created for the previous lab, Building a LINQ provider for HBase MapReduce. Please see the previous lab’s section, Creating and populating an HBase Table, for the code used to build this table.

Building an Excel add-in

The Excel add-in will consist of a single control pane. As the user interacts with the pane, underlying code accesses the Excel data model consisting of workbooks, worksheets and charts. Here’s what the completed add-in looks like.

The class HBasePane is a .NET User Control. It consists of two groups, View Table and Map Reduce. The above screen shot shows the user controls labeled Zookeeper Host, Table Name and Number of Records, which all have user entered values. By clicking on the button, View Records, the user has loaded in 20 rows from the HBase table, access_logs.

Here’s the handler code for the button click event.

        private void viewTableButtonClick(object sender, EventArgs e)
        {
            Excel.Worksheet activeWorksheet 
                 = ((Excel.Worksheet)Globals.ExcelHBaseAddIn.Application.ActiveSheet);
            activeWorksheet.Name = "Records";
            Excel.Range navigator = activeWorksheet.get_Range("A1");
            int numRows = Decimal.ToInt32(this.numberOfRecords.Value);
            // most of the work done here
            this.columns = ViewHBaseTable.populateWorkSheet(navigator
                , this.hostName.Text
                , this.tableName.Text
                , numRows);
            // autofit the range
            int numCols = this.columns.Count<string>();
            Excel.Range c1 = activeWorksheet.Cells[1, 1];
            Excel.Range c2 = activeWorksheet.Cells[numRows, numCols];
            this.cols = activeWorksheet.get_Range(c1, c2); 
            this.cols.EntireColumn.AutoFit();
            // populate the user controls with the column names
            this.filterComboBox.Items.AddRange(this.columns);
            this.frequencyComboBox.Items.AddRange(this.columns);
        }

All the work is done in the method, ViewHBaseTable.populateWorkSheet(). The user controls are hostName, tableName and numberOfRecords. The hostName control contains the address of the machine that’s running Zookeeper, which is responsible for managing connections from the HBase client API. Below is code from populateWorkSheet(). Notice that the HBase table column family and cell names are obtained using the methods getFamily() and getQualifier() along with the cell values. The method returns an array of strings that represents the column and cell names in the table. These are used to populate the combo box controls filterComboBox and frequencyComboBox in the group Map Reduce.

            Configuration hbaseConfig = HBaseConfiguration.create();
            hbaseConfig.set("hbase.zookeeper.quorum", hostName);
            try
            {
                HTable tbl = new HTable(hbaseConfig, tableName);
                Scan scan = new Scan();
                ResultScanner scanner = tbl.getScanner(scan);
                Result r;
                while (((r = scanner.next()) != null) && ndx++ < numRecords)
                {
                    List aList = r.list();
                    ListIterator li = aList.listIterator();
                    while (li.hasNext())
                    {
                        kv = (KeyValue)li.next();
                        familyName = Bytes.toString(kv.getFamily());
                        cellName = Bytes.toString(kv.getQualifier());
                        value = Bytes.toString(kv.getValue());
                        // make a unique list of all the column names
                        if (!names.Contains(familyName + ":" + cellName))
                        {
                            names.Add(familyName + ":" + cellName);
                        }
                        // add headers
                        if (currentRow == 2)
                        {
                            currentCell = navigator.Cells[1, currentColumn];
                            currentCell.Value2 = cellName;
                        }
                        currentCell = navigator.Cells[currentRow, currentColumn++];
                        currentCell.Value2 = value;
                    }
                    currentRow++;
                    currentColumn = 1;
                }
                scanner.close();
                tbl.close();
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return names.ToArray<string>();
        }

Generic filtering and frequency user interface

Below is a close-up screenshot of the HBase pane. The interface in the View Table group allows the user to point to a Hadoop implementation, choose a table and the number of records to load into the active worksheet. Once that is done, the user can then define a MapReduce job using the controls in the Map Reduce group.

The user interface allows filtering on any one column. The combo box control labeled Choose filter column contains all the column names in the form family:cell. The text box labeled FilterValue is the filter which elides all rows where the chosen column  doesn’t match the filter value. The combo box labeled Column to Count is used to choose the column whose values will be grouped and counted. The above values ask the question: “What are the pages—specifically the frequencies of the pages— visited by the IP address 80.240.129.183“.

When the button, Map Reduce, is clicked, this handler is invoked:

        private void onMapRedButtonClick(object sender, EventArgs e)
        {
            this.filterColumn = this.filterComboBox.Text;
            this.filterValue = this.filterValueTextBox.Text;
            this.frequencyColumn = this.frequencyComboBox.Text;
            Excel.Worksheet activeWorksheet 
                = ((Excel.Worksheet)Globals.ExcelHBaseAddIn.Application.Worksheets[2]);
            activeWorksheet.Name = "Frequency";
            Excel.Range navigator = activeWorksheet.get_Range("A1");
            // most of the fun stuff happens here
            int numRows = MapReduce.executeMapReduce(navigator
                , this.filterColumn
                , this.filterValue
                , this.frequencyColumn
                , this.hostName.Text
                , this.tableName.Text);
            // autofit the range
            Excel.Range c1 = activeWorksheet.Cells[1, 1];
            Excel.Range c2 = activeWorksheet.Cells[numRows, 2];
            this.cols = activeWorksheet.get_Range(c1, c2); 
            this.cols.EntireColumn.AutoFit();
            // bring the worksheet to the top
            activeWorksheet.Activate();
        }

All the work is done by the method MapReduce.executeMapReduce(), partially shown below. The .NET-to-Java method call, HBaseToLinq.FrequencyMapRed.executeMapRed(), is almost the same Java code used in the previous lab, Building a LINQ provider for HBase MapReduce. The only modifications have been to remove hard-coded column names, instead using the programmatic column names for filtering and frequency counts chosen by the user. The method then scans the results of the MapReduce job stored in the table, summary_user, and loads them into a worksheet, returning the number of records in the results table.

            try
            {
                HBaseToLinq.FrequencyMapRed.executeMapRed(hostName
                    , tableName
                    , frequencyColumn
                    , columnToFilter
                    , filterValue);
            }
            catch(Exception ex)
            {
                throw ex;
            }
            Configuration hbaseConfig = HBaseConfiguration.create();
            hbaseConfig.set("hbase.zookeeper.quorum", hostName);
            try
            {
                string cellName = 
                     frequencyColumn.Substring(frequencyColumn.IndexOf(":") +1);
                string familyName = 
                     frequencyColumn.Substring(0, frequencyColumn.IndexOf(":"));
                HTable tbl = new HTable(hbaseConfig, "summary_user");
                Scan scan = new Scan();
                ResultScanner scanner = tbl.getScanner(scan);
                Result r;
                while ((r = scanner.next()) != null)
                {
                    rowKey = Bytes.toString(r.getRow());
                    count = Bytes.toInt(r.getValue(Bytes.toBytes(familyName)
                         , Bytes.toBytes("total")));
                    currentCell = navigator.Cells[currentRow, currentColumn++];
                    currentCell.Value2 = rowKey;
                    currentCell = navigator.Cells[currentRow++, currentColumn];
                    currentCell.Value2 = count;
                    currentColumn = 1;
                }
                scanner.close();
                tbl.close();
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return currentRow - 1;

Here’s a screen shot of the Excel add-in after performing the MapReduce.

Visualizing data

Data visualization through graphs and charts is an important final step when investigating and analyzing data. Clicking on the button Chart Frequencies causes the add-in to create a stacked column chart of the Frequency worksheet. Here’s the code for the handler, onChartFrequenciesClick().

        private void onChartFrequenciesClick(object sender, EventArgs e)
        {
            Excel.Workbook wb = Globals.ExcelHBaseAddIn.Application.ActiveWorkbook;
            Excel.Chart chart = (Excel.Chart)wb.Charts.Add();
            chart.ChartType = Excel.XlChartType.xlColumnStacked;
            chart.SetSourceData(this.cols, Excel.XlRowCol.xlColumns);
            chart.HasTitle = true;
            string filterName = this.filterColumn.Substring(this.filterColumn.IndexOf(":") + 1);
            string frequencyName 
                 = this.frequencyColumn.Substring(this.frequencyColumn.IndexOf(":") + 1);
            chart.ChartTitle.Text = "Frequency of " 
                  + frequencyName 
                  + " when " + filterName 
                  + " = " + this.filterValue;
        }

This screen shot of the add-in shows the resulting chart. Notice that the MapReduce columns for filtering and frequency are different than the previous example. Here, the question being asked is “What is the frequency of visiting IP addresses for the page, /cats.html”.

Conclusion

Building an Excel add-in that supports viewing any HBase table of column families and provides filtering and Map Reduce frequency counts is relatively straightforward. Leveraging the HBase Java client APIs using JNBridgePro to create .NET proxies is key to the simplicity.  By keeping the MapReduce job both on the Java side and generic, any table can be filtered and reduced to frequencies of one particular column.

The source for this example can be downloaded here.

Build 2012 Recap

We were in Redmond last week for the Build conference, where Microsoft offered deep dives into their latest technologies. Unlike last year, where the emphasis was on in-depth looks at lower-level technologies like Windows RT, .NET 4.5, and Windows 8 internals, this year’s conference concentrated on higher-level application-oriented APIs like Microsoft Account (formerly Windows Live ID) and Windows Store, as well as more peripheral (to us) technologies like Windows Phone 8. In fact, if we had seen the list of sessions in advance, we might have decided to skip the conference and watch any relevant sessions online (although it was nice to receive the Surface RT, Nokia Lumia 920, and 100GB of SkyDrive capacity that all attendees received). Even so, there were a number of interesting sessions that were relevant to our work on interoperability. (All Build sessions can be seen here: http://channel9.msdn.com/events/build/2012.)

There was an interesting session that unveiled details of Microsoft’s previously announced Hadoop on Windows Azure offering (now called HDInsight). Since the offering has been by invitation only, there haven’t been too many details. It’s interesting to contrast Microsoft’s approach to Hadoop/.NET integration, which uses .NET streaming but conceals it with the artful use of wrappers, with our approach of direct API calls through JNBridgePro (here and here). Each approach can be useful in certain situations.

Microsoft offered more details on their new Windows Azure Virtual Machines, which brings to Windows Azure the capabilities already found in Amazon’s EC2. Microsoft claims advantages over Amazon’s offerings, particularly in the areas of administration and automation. For us and for our users, this is interesting because it makes it even easier to create applications that use JNBridgePro and deploy them to Windows Azure. It had been possible, but there were a number of complexities in setting up and starting the applications in the cloud; now it’s as easy in Windows Azure as it’s already been with Amazon EC2. In addition, Microsoft will be offering virtual machine images containing BizTalk Server 2010 R2 CTP, and you will be able to use JNBridge’s JMS adapter for BTS with those images.

A talk on the evolution of .NET covered both the history of the platform, including all of the earlier milestones, and possible future directions in which the platform can go. The speaker made the very interesting point that the typical PC of 1998 (when the .NET project began) or even 2000 (when it was unveiled) is very different from the typical PC of today, in terms of processing power, memory and storage, user interface, and connectivity, and any future .NET implementations will need to reflect that. We can only wonder what that will entail, but it’s encouraging to learn that Microsoft still considers .NET to be an essential platform for their future offerings.

One of the more surprising things we learned had to do with Windows Phone 8, which we really hadn’t been tracking, since it didn’t seem relevant to our mission. Windows Phone 8’s runtime is actually a version of the .NET CLR called CoreCLR, which is really based on the existing SilverLight CLR. We haven’t supported SilverLight, both because of its slow adoption, and because it has been constrained in what it can do, but we were interested to learn that in response to requests from developers, the CoreCLR will allow Windows Phone 8 applications to access existing native (read C++) gaming engines. Since Java Runtime Environments are also native C++ libraries, does that mean that a JVM can be hosted in a Windows Phone 8 app’s process? If so, it might be possible to support shared memory interoperability in Windows Phone 8 applications. It’s certainly something we’ll be looking into. Will it be possible to do something similar in Windows 8 “Metro” apps? That remains to be seen.

Did you attend Build, or watch sessions online? If so, did you see something that you’d like to call our attention to? If so, please let us know in the comments.

Building a LINQ provider for HBase MapReduce

Summary

HBase is a distributed, scalable, big data storage and retrieval system developed as part of the Apache Hadoop project. As a Java framework, HBase applications must use Java APIs, resulting in single-platform solutions. Cross-platform solutions, particularly those that provide front-end data query, analysis and presentation, like Microsoft Excel, or query languages like LINQ, the .NET Language Integrated Query framework, are currently not supported.

This latest project from JNBridge Labs explores building a simple .NET LINQ provider for HBase using the Java API for creating, managing and scanning HBase tables and the .NET LINQ provider interfaces. In addition, the project investigates LINQ support for HBase MapReduce jobs written in Java by adding an extension to the LINQ query syntax. 

By continuing the research into Hadoop and .NET interoperability introduced in the previous project, Creating .NET-based Mappers and Reducers for Hadoop, JNBridge Labs champions interoperability between Windows and any Java solution running on any system. The potential of products like DryadLINQ—canceled when Microsoft shelved the Dryad HPC project in favor of Hadoop—is still relevant, but only if that potential isn’t constrained to Hadoop running on particular systems. Microsoft’s port of Hadoop from Linux to Azure and Windows Server is single-system. Interoperability between Hadoop and LINQ or Excel  means supporting Hadoop on any system.

Introduction

Apache HBase is a big table implementation using the Apache Hadoop platform. Hadoop, in addition to providing distributed, parallel processing of petabyte-size data-sets, also provides the distributed file system, HDFS, where HBase tables are stored. An HBase table is a distributed, multi-dimensional sorted map containing structured data. A web access record is the obvious canonical example:

125.125.125.125 –  [10/Oct/2011:21:15:05 +0500] “GET /index.html HTTP/1.0” 200 1043 “http://www.ibm.com/” “Mozilla/4.05 [en] (WinNT; I)”

While the above record can be thought of as a row with several columns, the point is that it’s in a single table; there’s no relationship to other tables because there are no other tables. Nor is there a need to worry about indexes. Designing efficient schema that enable efficient queries isn’t the point. That’s because a row isn’t an object like an employee or a bank account, it’s a single datum and only has value as part of a data set, which can have several million rows. The queries one makes to these tables tend to be reductions to simple statistics, i.e. sums. In other words, we’re talking  brute-force, something that Apache Hadoop, MapReduce algorithms and parallel processing make straight-forward and fast.

LINQ, or Language INtegrated Query, is a Microsoft .NET  feature that provides query capabilities in the .NET languages. Actually, it’s syntactic sugar around the set of static query extensions defined in the class System.Linq.Enumerable that can be used to query any .NET collection of objects that implement IEnumerable<T>. LINQ is also extendable; it’s possible to implement a LINQ provider for just about anything as long as there’s a convenient abstraction that allows queries. In the case of HBase, a LINQ provider makes perfect sense.

This current offering from JNBridge Labs will explore building a simple LINQ provider for HBase using the HBase client Java API for creating, managing and scanning tables. In addition, the LINQ provider will also support  HBase MapReduce jobs written in Java using the HBase table mapper/reducer templates and Hadoop. This differs from the previous Hadoop lab that demonstrated using mappers and reducers written in .NET, but called by the MapReduce Job on the Java-side.

Getting Started

Here are the components required for the lab. Also, some important links, resources and configuration tips.

Apache Hadoop stack

The first requirement is access to a Hadoop/HBase implementation. Installing and configuring Hadoop and HBase can be an exercise, particularly if the platform is Windows rather than Linux. The best option is to download the free Linux virtual images from Cloudera. The VM runs CentOS 5.8 and contains Cloudera’s distribution of the entire Apache Hadoop stack already installed and configured. While it’s not truly distributed among many machines, each component of the Hadoop stack runs as a separate process, so it’s possible to add more worker nodes. The VM is available for VMWare, KVM and VirtualBox. Make sure that the VM’s access to the internet is bridged, do not use NAT. Also, because Java server applications running on Linux are notoriously finicky when it comes to DNS and the loop-back interface, make sure to modify the /etc/hosts file on the Linux and Windows machines. This will ensure that the HBase client API can actually connect to the Hadoop/HBase implementation through ZooKeeper.

On the Linux VM, remove the loopback IP address, 127.0.0.1, replacing it with the IP address of the VM, then recycle the VM.

# 127.0.0.1          localhost.localdomain localhost
192.168.1.3          localhost.localdomain localhost

On the Windows development machine, add this to the hosts file (WindowsSystem32driversetchosts):

192.168.1.3           localhost.localdomain

Development machine

Visual Studio 2010, Eclipse Juno and, of course, JNBridgePro 6.1. JNBridgePro will be used to build .NET proxies of the HBase client API as well as a Java class that implements MapReduce. This will enable calling Java from the .NET code that implements the LINQ provider.

Calling Java from .NET: Creating proxies using JNBridgePro

Since the LINQ provider is written in C#/.NET and needs to call a Java class API, the first step is to use the JNBridgePro plug-in for Visual Studio to create an assembly of proxies that represent the Java API. When a proxy of a Java class is instantiated in .NET, the real Java object is instantiated in the Java Virtual Machine. The JNBridgePro run-time manages communications, i.e. invoking methods, and syncing garbage collection between the .NET CLR and the JVM.

For this development step, as well as during run-time, a bunch of Hadoop, HBase and ZooKeeper JAR files must be available on the Windows machine. While these can be obtained by downloading the zip archives from the Apache project, it’s best to use the JAR files from the Cloudera distribution, thus avoiding version-itis. These can be scraped from the VM (look in /usr/lib/hadoop/lib, /usr/lib/hbase/lib, etc.)

This is a screen shot of the Edit Class Path dialog for the JNBridgePro Visual Studio plug-in.

These are the JAR files required to create the .NET proxies. During run-time, three additional JAR files must be included in the JVM’s class path when initiating the bridge between the JVM and the CLR: avro-1.5.4.jarcommons-httpclient-3.1.jar and slf4j-nop-1.6.1.jar (the last JAR file inhibits logging by Hadoop and HBase. If you’re interested in the volume of information logged during a MapReduce computation, just remove this JAR file from the class path).

This is a screen shot of the JNBridgePro proxy tool in Visual Studio. The left hand pane shows all the namespaces found in the JAR files shown in the above dialog. The required namespaces are org.apache.hadoop.hbase.client and org.apache.hadoop.hbase.filter. In addition, individual classes like org.apache.hadoop.hbase.HBaseConfiguration are required (see the link at the end of this blog to download the source).

 

By clicking on the Add+ button, the chosen classes, as well as every dependent class, will be found and displayed in the center pane. The right-hand pane displays the public members and methods of the Java HTable class. The last step is to build the proxy assembly, HBaseProxies.dll.

Creating and populating an HBase Table

It would be nice to have an HBase table loaded with data and provide an opportunity to test calling various HBase Java APIs from .NET. To keep things simple—after all, this is an example—the data will consist of an IP address, like “88.240.129.183” and the requested web page, for example “/zebra.html”. The code for creating and populating the table, shown below, is in the project, LoadData.

using org.apache.hadoop.hbase;
using org.apache.hadoop.hbase.client;
using org.apache.hadoop.hbase.util;
using org.apache.hadoop.conf;
com.jnbridge.jnbproxy.JNBRemotingConfiguration.specifyRemotingConfiguration(
                    com.jnbridge.jnbproxy.JavaScheme.sharedmem
                    , @"C:Program FilesJavajre6binclientjvm.dll"
                    , @"C:Program FilesJNBridgeJNBridgePro v6.1jnbcorejnbcore.jar"
                    , @"C:Program FilesJNBridgeJNBridgePro v6.1jnbcorebcel-5.1-jnbridge.jar"
                    , @"C:HBasejarscdh4Jarszookeeper-3.4.3-cdh4.0.0.jar;...");

Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.zookeeper.quorum""192.168.1.3");
HBaseAdmin admin = new HBaseAdmin(hbaseConfig);
HTableDescriptor desc = new HTableDescriptor("access_logs");
desc.addFamily(new HColumnDescriptor("details"));
admin.createTable(desc);
HTable htable = new HTable(hbaseConfig, "access_logs");
htable.setAutoFlush(false);
htable.setWriteBufferSize(1024 * 1024 * 12);
int totalRecords = 100000;
Random rand = new Random();
string[] pages = { "/dogs.html""/cats.html""/rats.html""/bats.html""/zebras.html"
                        , "/t-rex.html""/lizard.html""/birds.html""/beetles.html""/elephants.html"};
string[] ipAdrs = { "88.240.129.183""144.122.180.55""85.100.75.104""78.34.27.101"
                        , "23.121.45.220""206.27.34.178""174.82.96.35""13.234.26.45"
                        , "245.213.200.10""167.38.92.14""27.98.32.45""22.48.92.13" };
for (int i = 0; i < totalRecords; i++)
{
    Put put = new Put(Bytes.toBytes(i));
    put.add(Bytes.toBytes("details"), Bytes.toBytes("ip")
            , Bytes.toBytes(ipAdrs[rand.Next(ipAdrs.Length)]));
    put.add(Bytes.toBytes("details"), Bytes.toBytes("page")
            , Bytes.toBytes(pages[rand.Next(pages.Length)]));
    htable.put(put);
}
htable.flushCommits();
htable.close();

The above C# code uses the type ‘sbyte’, not ‘byte’. That’s because the Java type ‘byte’ is signed, in .NET it’s unsigned. Since this is .NET calling a Java method, the proxies return type ‘sbyte’. Bringing up the HBase shell on the Linux VM, we can dump the first row of the HBase table, access_logs.

The row is comprised of two cells, ip and page, both grouped under a column, details. A column is a semantic grouping of cells, hence the use of the term ‘family’ that prevails. The hex numbers on the left, both zero, are the index to the first row. The timestamp allows a third dimension, providing queries based on a time span.

Building a custom LINQ Provider

LINQ, in its simplest form, is a set of extension methods to the generic interface IEnumerable<T>. Consider the following class, really no more than a structure:

public class AccessRecord {
    // Properties.
    public string ipAddress { getset; }
    public string page { getset; }       
    // Constructor.
    internal AccessRecord(string ip, string page) {
        this.ipAddress = ip;
        this.page = page;
    }
}

A collection of AccessRecord instances using the generic List<T> template, which inherits from IEnumerable<T>, can be queried using the static extension methods defined in the class System.Linq.Enumerable. For example, consider determining the frequency—the count—of the pages requested by a given IP address.

List<AccessRecord> lst = new List<AccessRecord>();
var query1 = lst.Where(rec => rec.ipAddress == "88.240.129.183").OrderBy(rec => rec.page)
             .GroupBy(rec => rec.page)
             .Select(grps => new {ip = grps.Key, count = grps.Count()});

Alternatively, using the syntactic sugar in C#, the query can be stated in this form.

var query1 = from rec in records
             where rec.ipAddress == "88.240.129.183"
             orderby rec.page
             group rec by rec.page into grps
             select new { ip = grps.Key, count = grps.Count() };

While this is pretty cool, it will not scale well. The list of access records could number 10,000,000. Moreover, there’s the overhead of building the initial data structure in the first place. That’s why LINQ supplies a framework for building custom query providers. The entire point is to have both the data and the query computations happen someplace else, like HBase.

Implementing IOrderedQueryable<T> and IQueryProvider

A LINQ provider requires implementing two interfaces: IQueryProvider and either IQueryable<T> or IOrderedQueryable<T>. If the LINQ provider needs to support the OrderBy and ThenBy query operators, then the IOrderedQueryable interface must be implemented (actually, because IOrderedQueryable inherits from IQueryable, all three interfaces must be implemented).

The implementation of IOrderedQueryable<T> can be found in the class, QueryableHBaseData. The important method in this class is the implementation of IEnumerable<T>.GetEnumerator(). Consider the code that displays the results of the above queries by enumerating over the returned collection.

foreach (var pg in query1)
{
    Console.WriteLine("     page: " + pg.ip + " " + pg.count);
}

The actual query is executed when the  GetEnumerator() method is called on the instance of QueryableHBaseData.

public IQueryProvider Provider { getprivate set; }
public Expression Expression { getprivate set; }

public IEnumerator<TData> GetEnumerator()
{
    return (Provider.Execute<IEnumerable<TData>>(Expression)).GetEnumerator();
}

The Provider property is the implementation of IQueryProvider, which can be found in the class, HBaseQueryProvider. The Expression property is the query to be executed. The HBaseQueryProvider.Execute() method is responsible for executing the query. So far, all of this is straight forward. The hard part in implementing a LINQ provider is parsing, walking and decorating an expression tree represented by an instance of System.Linq.Expressions.Expression.

The innermost-where expression

The reason for walking the expression tree is to find the innermost where expression. A where operator is a filter—it’s what makes a query, well, a query. Most custom LINQ providers find the innermost-where expression and partially evaluate the lambda expression argument to retrieve the filter predicate values. The predicate values, as well as some other information, is used to query the real data source and return the results as a collection of IQueryable<T>. The expression tree is then rewritten, essentially replacing the original data source with the collection of IQueryable objects that now resides in memory. The new expression tree is then re-executed using the HBaseQueryProvider class. All of this is implemented in the method HBaseQueryContext.Execute().

internal static object Execute(Expression expression, bool IsEnumerable)
{
    // Find the call to Where() and get the lambda expression predicate.
    InnermostWhereFinder whereFinder = new InnermostWhereFinder();
    MethodCallExpression whereExpression = whereFinder.GetInnermostWhere(expression);
    LambdaExpression lambdaExpression = 
             (LambdaExpression)((UnaryExpression)(whereExpression.Arguments[1])).Operand;
    // Send the lambda expression through the partial evaluator.
    lambdaExpression = (LambdaExpression)Evaluator.PartialEval(lambdaExpression);
    // Get the column name(s)
    ColumnFinder cf = new ColumnFinder(lambdaExpression.Body);
    List<string> columns = cf.Columns;  
    // Call HBase and get the results.
    AccessRecord[] records = HBaseHelper.GetColumnsFromHBase(
             columns.First<string>()
             , cf.IsIP, cf.IsPage);    
    IQueryable<AccessRecord> queryableRecords = records.AsQueryable<AccessRecord>();
    // Copy the expression tree that was passed in, changing only the first
    // argument of the innermost MethodCallExpression.
    ExpressionTreeModifier treeCopier = new ExpressionTreeModifier(queryableRecords);
    Expression newExpressionTree = treeCopier.CopyAndModify(expression);
    if (IsEnumerable)
        return queryableRecords.Provider.CreateQuery(newExpressionTree);
    else
        return queryableRecords.Provider.Execute(newExpressionTree);
}

The class ColumnFinder takes the lambda expression that represents the predicate to the where operator and returns the right hand side of the predicate expression, in our example record.ip == “88.240.129.183”. The left hand side of the predicate expression, the field to test against, is obtained through the properties ColumnFinder.IsIP and ColumnFinder.IsPage. In keeping with the above example queries, this data is used to call HBaseHelper.GetColumnsFromHBase() with arguments 88.240.129.183, true and false.

Scanning and filtering an HBase table

Now that all the boring stuff is done, we can make some calls to the  HBase client Java API from .NET using the proxy assembly. The call to HBaseHelper.GetColumnsFromHBase() results in calling this code.

using org.apache.hadoop.hbase;
using org.apache.hadoop.hbase.client;
using org.apache.hadoop.hbase.util;
using org.apache.hadoop.io;
using org.apache.hadoop.conf;
using org.apache.hadoop.hbase.filter;
public static AccessRecord[] hbaseColumnScan(string column, bool isIP, bool isPage)
{
    HTable tbl = null;
    Scan scn = null;
    SingleColumnValueFilter svcFilter = null;
    ResultScanner rs = null;
    string pg = null;
    Configuration hbaseConfig = HBaseConfiguration.create();
    hbaseConfig.set("hbase.zookeeper.quorum""192.168.1.3");
    try
    {
        tbl = new HTable(hbaseConfig, "access_logs");
        scn = new Scan();
        scn.setCaching(500);
        sbyte[] scanColumn = null;
        if ( isIP )
            scanColumn = ipCol;
        else if ( isPage )
            scanColumn = pageCol;         
        svcFilter = new SingleColumnValueFilter(family
            , scanColumn
            , CompareFilter.CompareOp.EQUAL
            , Bytes.toBytes(column));
        scn.setFilter(svcFilter);
        rs = tbl.getScanner(scn);
    }
    catch (Exception ex)
    {
        Console.WriteLine("Unable to perform  column scan: " + ex.Message);
    }
    List<AccessRecord> records = new List<AccessRecord>();
    for (Result r = rs.next(); r != null; r = rs.next())
    {
        pg = Bytes.toString(r.getValue(family, pageCol));
        records.Add(new AccessRecord(column, pg));
    }
    rs.close();
    tbl.close();
    return records.ToArray<AccessRecord>();
}

Testing the LINQ provider

Here’s the example query using the HBase LINQ provider.

QueryableHBaseData<AccessRecord> records = 
                new QueryableHBaseData<AccessRecord>();
var query1 = from record in records
                where record.ipAddress == "88.240.129.183"
                orderby record.page
                group record by record.page into grps
                select new { ip = grps.Key, count = grps.Count() };

Console.WriteLine("  IP address 88.240.129.183 visted these pages");
foreach (var pg in query1)
{
    Console.WriteLine("     page: " + pg.ip + " " + pg.count);
}

This query scans the HBase table for all records where the IP address equals 88.240.129.183. However, in order to sort the requested pages, then group and count them, the query uses the standard Enumerable extensions against an array in memory. Considering the potential size of the results from a simple table scan, the current approach of implementing only the innermost-where operator is probably inefficient. It also doesn’t make use of MapReduce, almost the whole point in using HBase, especially considering that the example query is tailor-made for MapReduce.

Using MapReduce in a LINQ query

Since the LINQ provider is already calling Java, it should be simple to write a MapReduce implementation in Java using the HBase table mapper and reducer classes. It also gives us an excuse to write some Java code.

public class FrequencyMapRed {

    private static final byte[] family = Bytes.toBytes("details");
    private static final byte[] ipCol = Bytes.toBytes("ip");
    private static final byte[] pageCol = Bytes.toBytes("page");

    static class Mapper1 extends TableMapper<Text, IntWritable> 
    {
        private static final IntWritable one = new IntWritable(1);
        private Text txt = new Text();  
        @Override
        public void map(ImmutableBytesWritable row, Result values, Context context)
                 throws IOException 
        {      
           String val = new String(values.getValue(familypageCol));
            txt.set(val);
            try {
                context.write(txtone);
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    public static class Reducer1 extends TableReducer<Text, IntWritable, ImmutableBytesWritable> 
    {
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException 
        {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.add(family, Bytes.toBytes("total"), Bytes.toBytes(sum));
            context.write(null, put);
        }
    }

    public static void executeMapRed(String columnToCount
           , String filterColumn
           , String filterColumnValue) 
               throws Exception
    { 
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum""localhost.localdomain");
        Job job = null;
        try {
	        job = new Job(conf, "Hbase_FreqCounter1");
	        job.setJarByClass(FrequencyMapRed.class);
	        Scan scan = new Scan();
	        scan.setCaching(500);        
	        scan.setCacheBlocks(false);
	        SingleColumnValueFilter svcFilter = new SingleColumnValueFilter(family
	                , Bytes.toBytes(filterColumn)
	                , CompareFilter.CompareOp.EQUAL
	                , Bytes.toBytes(filterColumnValue));
	        scan.setFilter(svcFilter);
	        TableMapReduceUtil.initTableMapperJob("access_logs", scan, Mapper1.class, Text.class,
	                IntWritable.class, job);
	        TableMapReduceUtil.initTableReducerJob("summary", Reducer1.class, job);
	        job.waitForCompletion(true);
        }
        catch (Exception ex) {
        	throw ex;
	}  
    }
}

Notice that the results of the MapReduce computation are placed in a table called summary. The table is created once and reused to hold results each time the MapReduce code executes. This table must be created using the HBase shell.

Adding the Java MapReduce to the proxy assembly

The above class can be archived to a JAR file, FrequencyMapRed.jar. Since the method FrequencyMapRed.executeMapRed() must be called from .NET, the JAR file needs to be added to the class path in the JNBridgePro Visual Studio plug-in. The class can be added to the proxy assembly by using the Add Classes from Classpath dialog. Once the proxy assembly is rebuilt, the new Java method that implements the MapReduce frequency computation can be called from .NET.

Modifying the LINQ provider to support MapReduce

The above MapReduce frequency implementation provides the same result as three LINQ query operators: Where, OrderBy and GroupBy. The innermost-where strategy did not implement the OrderBy and GroupBy operators. Trying to shoe-horn MapReduce to standard query operators is problematic. The problem is solved by optimizing the query into a graph of MapReduce jobs. Some query optimizers for big data implementations do exactly that, but for the scope of this example, it would be simpler to add a new query operator called frequency. Adding a new extension to IEnumerable is easy. The problem is that it would only be available when using the method call form of the query, i.e. Where().Frequency().Select(). It would not be available using the query form because C# doesn’t supply the syntax. However, the query form does support invoking delegates. Using this strategy leads to the following code found in the source file, LinqFrequency.cs.

public class HBase
{
    static public Func<AccessRecordIEnumerable<KeyValuePair<stringint>>> frequency 
                    = MyFrequency;
    public static IEnumerable<KeyValuePair<stringint>> MyFrequency(AccessRecord recs)
    {
        IEnumerable<KeyValuePair<stringint>> results = null;
        try
        {
            results = executeMapReduce(
                     (recs.page == "ip" ? "page" : "ip")
                     , recs.page, recs.ipAddress);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        return results;
    }

    public static IEnumerable<KeyValuePair<stringint>> executeMapReduce(
                          string frequencyForThisCol
                        , string filterThisCol
                        , string withThisValue)
    {
        try
        {
            FrequencyMapRed.executeMapRed(frequencyForThisCol, filterThisCol, withThisValue);
        }
        catch (Exception ex)
        {
            throw ex;
        }
        Configuration hbaseConfig = HBaseConfiguration.create();
        hbaseConfig.set("hbase.zookeeper.quorum""192.168.1.3");
        HTable htable = new HTable(hbaseConfig, "summary");
        Scan scan = new Scan();
        ResultScanner scanner = htable.getScanner(scan);
        Result r;
        string page;
        int count;
        List<KeyValuePair<stringint>> results = new List<KeyValuePair<stringint>>();
        for (r = scanner.next(); r != null; r = scanner.next())
        {
            page = Bytes.toString(r.getRow());
            count = Bytes.toInt(r.getValue(Bytes.toBytes("details"), Bytes.toBytes("total")));
            results.Add(new KeyValuePair<stringint>(page, count));
        }
        return results;
    }
}

The above code defines a delegate that takes type AccessRecord as an argument and returns an IEnumerable wrapping a KeyValuePair type consisting of a string and an integer.

Using the Where and SelectMany query operators

The Java MapReduce job uses a table scan, in fact it’s the same scan used to implement the innermost-where LINQ provider. That means the MapReduce is really providing the same functionality, albeit much more efficiently, as the OrderBy and GroupBy query operators. Therefore, a LINQ query using the frequency delegate defined above does not require the two operators. In fact, the Where operator is not really needed. However, it would be nice to use the Where query operator as a means to define the predicate used by the table scan, even though the scan is running wholly on the Java side.

To support this, the method HBaseQueryContext.Execute() needs to be modified to determine if the SelectMany query operator will invoke the frequency delegate. If there’s no SelectMany operator in the expression tree, or, if there is, but it’s not invoking the frequency delegate, then the call to HBaseHelper.GetColumnsFromHBase() is made as before.

However, if the SelectMany operator is present and it’s invoking the frequency delegate, then a single AccessRecord instance is used to hold the arguments for the MapReduce.

ColumnFinder cf = new ColumnFinder(lambdaExpression.Body);
List<string> columns = cf.Columns;

AccessRecord[] records;
if (expression.ToString().Contains("SelectMany") 
            && expression.ToString().Contains("HBase.frequency"))
{
    List<AccessRecord> aList = new List<AccessRecord>();
    aList.Add(new AccessRecord(columns.First<string>(), (cf.IsIP ? "ip" : "page")));
    records = aList.ToArray<AccessRecord>();
}
else
{
    // Call HBase and get the results.
    records = HBaseHelper.GetColumnsFromHBase(columns.First<string>(), cf.IsIP, cf.IsPage);
}

Testing the modified LINQ provider

The test code has two LINQ queries. The first uses the innermost-where strategy, the second uses the SelectMany to invoke the frequency delegate that results in calling the Java MapReduce implementation. The code also provides some bench-marking times on the two queries.

static void Main(string[] args)
{
    DateTime startTime;
    DateTime endTime;
    TimeSpan ts;

    QueryableHBaseData<AccessRecord> records =
        new QueryableHBaseData<AccessRecord>();

    Console.WriteLine("Scan only");
    startTime = DateTime.Now;
    var query1 = from record in records
                    where record.ipAddress == "88.240.129.183"
                    orderby record.page
                    group record by record.page into grps
                    select new { ip = grps.Key, count = grps.Count() };

    Console.WriteLine("  IP address 88.240.129.183 visted these pages");
    foreach (var pg in query1)
    {
        Console.WriteLine("     page: " + pg.ip + " " + pg.count);
    }
    endTime = DateTime.Now;
    ts = endTime - startTime;
    Console.WriteLine("Elapsed time: " + ts.TotalMilliseconds);

    Console.WriteLine("Scan and map/reduce");
    startTime = DateTime.Now;
    var query2 = from record in records
                    where record.ipAddress == "88.240.129.183"
                    from count in HBase.frequency(record)
                    select count;

    Console.WriteLine("  IP address 88.240.129.183 visted these pages");
    foreach (KeyValuePair<stringint> kvp in query2)
    {
        Console.WriteLine("     page: " + kvp.Key + " " + kvp.Value);
    }
    endTime = DateTime.Now;
    ts = endTime - startTime;
    Console.WriteLine("Elapsed time: " + ts.TotalMilliseconds);
 }

Here’s the output to the console. It’s obvious that the MapReduce LINQ provider runs in less than half the time of the innermost-where LINQ provider. That’s because the OrderBy and GroupBy query operators are required to process 8,474 AccessRecord objects.

Conclusion

Building a LINQ provider for HBase is relatively straight forward if a table scan using the HBase client API is mapped directly to a LINQ innermost Where query operator. However, the Where operation can result in a very large collection of objects residing in memory, therefore subsequent query operators like OrderBy and GroupBy are inefficient. Using HBase MapReduce to provide the equivalent functionality of the Where, OrderBy and GroupBy query operators in a single distributed, parallel computation is much faster, but it’s difficult to parse an expression tree of arbitrary query operators to determine if the query can be optimized to a set of MapReduce jobs.  The simple solution is to add a MapReduce operator called frequency.

Acknowledgements and resources

Thanks to Steve Willsens and MSDN for the LINQ  example, LINQ to TerraServer Provider Samplewhich supplied the classes that implement a LINQ expression parser. For an excellent discussion of the LINQ extensions to IEnumerable<T>, visit Raj Kaimal’s How LINQ to Object statements work. For a look at a HBase MapReduce implementation, visit Sujee Maniyam’s Hbase Map Reduce Example-Frequency Counter

The source for this example can be downloaded here.

Creating .NET-based Mappers and Reducers for Hadoop with JNBridgePro

You can download the source code for the lab here.

Summary

The Apache Hadoop framework enables distributed processing of very large data sets. Hadoop is written in Java, and has limited methods to integrate with “mapreducers” written in other languages. This lab demonstrates how you can use JNBridgePro to program directly against the Java-based Hadoop API to create .NET-based mapreducers.

Hadoop mapreducers, Java, and .NET

Apache Hadoop (or Hadoop, for short) is an increasingly popular Java-based tool used to perform massively parallel processing and analysis of large data sets. Such large data sets requiring special processing techniques are often called “Big Data.” The analysis of very large log files is a typical example of a task suitable for Hadoop. When processed using Hadoop, the log files are broken into many chunks, then farmed out to a large set of processes called “mappers,” that perform identical operations on each chunk. The results of the mappers are then sent to another set of processes called “reducers,” which combine the mapper output into a unified result. Hadoop is well-suited to running on large clusters of machines, particularly in the cloud. Both Amazon EC2 and Microsoft Windows Azure, among other cloud offerings, either provide or are developing targeted support for Hadoop.

In order to implement the functionality of a Hadoop application, the developer must write the mappers and reducers (sometimes collectively called “mapreducers”), then plug them into the Hadoop framework through a well-defined API. Because the Hadoop framework is written in Java, most mapreducer development is also done in Java. While it’s possible to write the mapreducers in languages other than Java, through a mechanism known as Hadoop Streaming, this isn’t an ideal solution as the data sent to the mapreducers over standard input needs to be parsed and then converted from text to whatever native form is being processed. Handling the data being passed through standard input and output incurs overhead, as well as additional coding effort.

The alternative that we present in this lab is a way to create .NET-based mapreducers by programming against the Hadoop API using JNBridgePro. In this lab, the .NET-based mapreducers run in the actual Hadoop Java processes (which is possible if the Hadoop cluster is running on Windows machines), but we will also discuss ways to run the .NET sides outside the Java processes. In this example, we show how to host the maximal amount of mapreducer functionality in .NET, although you could use the same approach to host as much or as little of the functionality in .NET as you like, and host the rest in Java. You will come away with an understanding of how to create .NET-based Hadoop mapreducers and deploy them as part of a Hadoop application. The code we provide can be used as a pattern upon which you can create your own .NET-based mapreducers.

You might want or need to write mapreducers in .NET for a number of reasons. As examples, you might have an investment in .NET-based libraries with specialized functionality that needs to be used in the Hadoop application. Your organization may have more developers with .NET skills than with Java skills. You may be planning to run your Hadoop application on Windows Azure, where, even though the Hadoop implementation is still in Java and there is support for Java, the majority of the tooling is far more friendly to .NET development.

This lab is not a tutorial in Hadoop programming, or in deploying and running Hadoop applications. For the latter, there is a good tutorial here. The tutorial refers to some older versions of Eclipse and Hadoop, but will work with more recent versions. Even if you’re familiar with Hadoop, the example builds on some of the setup in the tutorial, so it might be worth working through the tutorial beforehand. We will point out these dependencies when we discuss how the Hadoop job should be set up, so you can decide whether to build on top of the tutorial or make changes to the code in the lab.

Example

The lab is based on the standard “word count” example that comes with the Hadoop distribution, in which the occurrences of all words in a set of documents are counted. We’ve chosen this example because it’s often used in introductory Hadoop tutorials, and is usually well understood by Hadoop programmers. Consequently, we won’t spend much time talking about the actual functionality of the example: that is, how the word counting actually works.

What this example does is move all the Java-based functionality of the “word count” mapreducers into C#. As you will see, we need to leave a small amount of the mapreducer in Java as a thin layer. Understanding the necessity of this thin layer, and how it works, provides a design pattern that can be used in the creation of other .NET-based mapreducers.

Interoperability strategy

At first glance, the apparent approach to interoperability would be to use JNBridgePro to proxy the Java-based Mapper and Reducer interfaces and the MapReduceBase abstract class into .NET, then program in C# against these proxies. Then, still using JNBridgePro, proxy the .NET-based mapper and reducer classes, and register those proxies with the Hadoop framework. The resulting project would use bidirectional interoperability and would be quite straightforward. Unfortunately, this approach leads to circularities and name clashes: the proxied mapper and reducer will contain proxied parameters with the same name as the actual Java-based Hadoop classes. In other words, there will be proxies of proxies, and the result will not work. While it is possible to edit the jar files and perform some other unusual actions, the result would be confusing and would not work in all cases. So we need to take a different approach.

Instead, we will create thin Java-based wrapper classes implementing the Mapper and Reducer interfaces, which will interact with the hosting Hadoop framework, and which will also call the .NET-based functionality through proxies, making this a Java-to-.NET project. In the cases where the .NET functionality needs to access Java-based Hadoop objects, particularly OutputCollectors and Iterators, it will be done indirectly, through callbacks. The resulting code is much simpler and more elegant.

The original WordCount example

Let’s start with the original Java-based “word count” mapper and reducer, from the example that comes with Hadoop. We will not be using this code in our example, and we will not be discussing how it works (it should be fairly straightforward if you’re familiar with Hadoop), but it will be useful as a reference when we move to the .NET-based version.

Here is the mapper:

/**
* WordCount mapper class from the Apache Hadoop examples.
* Counts the words in each line.
* For each line of input, break the line into words and emit them as
* (word, 1).
*/
public class WordCountMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}

}

And here is the reducer:

/**
* From the Apache Hadoop examples.
* A WordCount reducer class that just emits the sum of the input values.
*/
public class WordCountReducer extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}

}

Migrating the functionality to .NET

What we want to do next is migrate as much of the mapper and reducer functionality as possible to .NET (in this case, C#) code. Note that we can’t migrate all of it verbatim; the Java code references Hadoop-specific classes like Text, IntWritable, LongWritable, OutputCollector, and Reporter, as well as other crucial Java classes such as Iterator. Text, IntWritable, and LongWritable can be easily converted to string, int, and long, which are automatically converted by JNBridgePro. However, while it is possible to convert classes like OutputCollector and Iterator to and from native .NET classes like ArrayList, such conversions are highly inefficient, since they involve copying every element in the collection, perhaps multiple times. Instead, we will continue to use the original OutputCollector and Iterator classes on the Java side, and the .NET code will only use them indirectly, knowing nothing about the actual classes. Callbacks provide a mechanism for doing this.

Here is the C# code implementing the mapper and reducer functionality:

namespace DotNetMapReducer
{

// used for callbacks for the OutputCollector
public delegate void collectResult(string theKey, int theValue);
// used for callbacks to the Iterator
public delegate object getNextValue();
// returns null if no more values, returns boxed integer otherwise
public class DotNetMapReducer
{
public void map(string line, collectResult resultCollector)
{
StringTokenizer st = new StringTokenizer(line);
while (st.hasMoreTokens())
{
string nextToken = st.nextToken();
resultCollector(nextToken, 1);
}
}
public void reduce(string key, getNextValue next, collectResult resultCollector)
{
int sum = 0;
object nextValue = next(); // get the next one, if there
while (nextValue != null)
{
sum += (int)nextValue;
nextValue = next();
}

resultCollector(key, sum);
}
}
public class StringTokenizer
{
private static char[] defaultDelimiters = { ‘ ‘, ‘t’, ‘n’, ‘r’, ‘f’ };
private string[] tokens;
private int numTokens;
private int curToken;
public StringTokenizer(string line, char[] delimiters)
{
tokens = line.Split(delimiters);
numTokens = tokens.Length;
curToken = 0;
}
public StringTokenizer(string line)
: this(line, defaultDelimiters)
{
}
public bool hasMoreTokens()
{
if (curToken < numTokens) return true;
else return false;
}
public string nextToken()
{
if (hasMoreTokens()) return tokens[curToken++];
else throw new IndexOutOfRangeException();
}
}

}

StringTokenizer is just a .NET-based reimplementation of the standard Java StringTokenizer class, and we won’t be discussing it further.

Note the two delegates collectResult and getNextValue that are used by the mapreducer. These are ways to call back into the Java code for additional functionality, possibly using classes like OutputCollector and Iterator that the .NET code knows nothing about. Also note that the .NET code uses string and int where the Java code had Text and IntWritable (and LongWritable); the wrapper code will handle the conversions.

Once we have the .NET functionality built and tested, we need to proxy the mapreducer class and supporting classes. We then incorporate the proxy jar file, jnbcore.jar, and bcel-5.1-jnbridge.jar into our Java Hadoop project and can start writing the Java-based mapper and reducer wrappers. Here they are:

public class MapperWrapper extends MapReduceBase

implements Mapper<LongWritable, Text, Text, IntWritable> {
private static MapReducerHelper mrh = new MapReducerHelper();
private DotNetMapReducer dnmr = new DotNetMapReducer();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
OutputCollectorHandler och = new OutputCollectorHandler(output);
dnmr.map(value.toString(), och);
Callbacks.releaseCallback(och);
}

}

public class ReducerWrapper extends MapReduceBase

implements Reducer<Text, IntWritable, Text, IntWritable> {
private static MapReducerHelper mrh = new MapReducerHelper();
private DotNetMapReducer dnmr = new DotNetMapReducer();
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
IteratorHandler ih = new IteratorHandler(values);
OutputCollectorHandler och = new OutputCollectorHandler(output);
dnmr.reduce(key.toString(), ih, och);
Callbacks.releaseCallback(ih);
Callbacks.releaseCallback(och);
}

}

Note that the only purpose of these thin wrappers is to interface with the Hadoop framework, host the .NET-based functionality, and handle passing of values to and from the .NET components (along with necessary conversions).

There are two callback objects, IteratorHandler and OutputCollectorHandler, which encapsulate the Iterator and OutputCollector objects. These are passed where the .NET map() and reduce() methods expect delegate parameters, and are used to hide the actual Hadoop Java types from the .NET code. The mapreducer will simply call the resultCollector() or getNextValue() delegate, and the action will be performed, or value returned, without the .NET side knowing anything about the mechanism used by the action.

Since callbacks consume resources (particularly, a dedicated thread for each callback object), and there can be many invocations of map() and reduce(), it is important to release the callback objects (using the Callbacks.releaseCallback() API) to release those threads when they are no longer needed. If you do not make those calls, performance will degrade substantially.

Here is the Java code for the two callback classes:

public class OutputCollectorHandler implements collectResult
{

private OutputCollector<Text, IntWritable> outputCollector = null;
public OutputCollectorHandler(OutputCollector<Text, IntWritable>
theCollector)
{
outputCollector = theCollector;
}
public void Invoke(String theKey, int theValue)
{
try
{
outputCollector.collect(new Text(theKey),
new IntWritable(theValue));
}
catch(IOException e)
{
// not sure why it would throw IOException anyway
}
}

}

import System.BoxedInt;
import System.Object;

public class IteratorHandler implements getNextValue
{

private Iterator<IntWritable> theIterator = null;
public IteratorHandler(Iterator<IntWritable> iterator)
{
theIterator = iterator;
}
// returns null if no more values, otherwise returns a boxed integer
public Object Invoke()
{
if (!theIterator.hasNext()) return null;
else
{
IntWritable iw = theIterator.next();
int i = iw.get();
return new BoxedInt(i);
}
}

}

The two callback objects encapsulate the respective Java collections and perform the appropriate conversions when their Invoke() methods are called. The IteratorHandler, rather than providing the typical hasNext()/getNext() interface, has a single Invoke() method (this is how callbacks work in Java-to-.NET projects), so we’ve written Invoke() to return null if there are no more objects, and to return the integer (boxed, so that it can be passed in place of a System.Object), when there is a new value. There are other ways you can choose to do this, but this method will work for iterators that return primitive objects.

Finally, we need to configure JNBridgePro. For maximum flexibility, we’ve chosen to configure it programmatically, through the MapReducerHelper class. Since configuration can only happen once in each process, and must happen before any proxy call, we’ve created MapReducerHelper to perform the configuration operation inside its static initializer, which is executed when the class is loaded. This will happen only once per process and is guaranteed to be done before any proxies are called. Here is Java-based MapReducerHelper:

public class MapReducerHelper

{
static
{
Properties p = new Properties();
p.put(“dotNetSide.serverType”, “sharedmem”);
p.put(“dotNetSide.assemblyList.1”,
“C:/DotNetAssemblies/DotNetMapReducer.dll”);
p.put(“dotNetSide.javaEntry”,
“C:/Program Files/JNBridge/JNBridgePro v6.0/4.0-targeted/JNBJavaEntry.dll”);
p.put(“dotNetSide.appBase”,
“C:/Program Files/JNBridge/JNBridgePro v6.0/4.0-targeted”);
DotNetSide.init(p);
}

}

The paths in the configuration will likely be different in your deployment, so you will need to adjust them accordingly.

Finally, we create the Java-based Hadoop driver in the usual way, specifying the new wrappers as the mapper and reducer classes:

public class WordCountDotNetDriver
{

public static void main(String[] args)
{
JobClient client = new JobClient();
JobConf conf = new JobConf(WordCountDotNetDriver.class);
// specify output types
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
// specify input and output DIRECTORIES (not files)
FileInputFormat.setInputPaths(conf, new Path(“In”));
FileOutputFormat.setOutputPath(conf, new Path(“Out”));
// specify a mapper
conf.setMapperClass(MapperWrapper.class);
conf.setCombinerClass(ReducerWrapper.class);
// specify a reducer
conf.setReducerClass(ReducerWrapper.class);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}

}

Deploying and running the Hadoop application

At this point we can deploy and run the application. Start by deploying the application to your Hadoop cluster. (It needs to be a cluster of Windows machines, since we’re using shared memory. We’ll talk about Linux clusters later.) If you have a setup like the one described in the aforementioned tutorial, just copy the source files over and build.

Make sure that the appropriate version (x86 or x64, depending on whether you’re running your Hadoop on 32-bit or 64-bit Java) of JNBridgePro is installed on all the machines in the cluster and that an appropriate license (which may be an evaluation license) is installed, and make sure that the dotNetSide.javaEntry and dotNetSide.appBase properties in MapReducerHelper agree with the installed location of JNBridgePro. If not, either install JNBridgePro in the correct place, or edit MapReducerHelper and rebuild.

You will need to put the proxy jar file as well as jnbcore.jar and bcel-5.1-jnbridge.jar in the Hadoop classpath. There are a couple of ways to do this. You can add the paths to these files to the HADOOP_CLASSPATH environment variable. Alternatively, you can copy these jar files to the Hadoop lib folder.

Finally, copy to each machine in the Hadoop cluster the .NET DLL file containing the .NET classes, and put it in the location specified in the dotNetSide.assemblyList.1 property in MapReducerHelper.

Once this is all done, start up all your Hadoop nodes. Make sure that in your HDFS service you’ve created a directory “In”, and that you’ve uploaded all the .txt files in the root Hadoop folder: mostly licensing information, release notes, and readme documents. Feel free to load additional documents. If there is an “Out” directory, delete it along with its contents. (If the “Out” directory exists when the program is run, an exception will be thrown.)

Now, run the Hadoop application. It will run to completion, and you will find an HDFS folder named “Out” containing a document with the result.

The Hadoop job that you just ran worked in exactly the same way as any ordinary all-Java Hadoop job, but the mapreducer functionality was written in .NET and was running in the same processes as the rest of the Hadoop framework. No streaming was required to do this, and we were able to program against native Hadoop APIs.

Running the Hadoop job on Linux machines

As we’ve chosen to run the Hadoop application using JNBridgePro shared memory communications, we need to run our Hadoop cluster on Windows machines. This is because the .NET Framework needs to be installed on the machines on which the Hadoop processes are running.

It is possible to run the application on a cluster of Linux machines, but you will need to change the configuration to use tcp/binary communications, and then run .NET-side processes on one or more Windows machines. The simplest way to run a Java side is to configure and use the JNBDotNetSide.exe utility that comes with the JNBridgePro installation. Configure each Java side to point to one of the .NET-side machines. You can share a .NET side among multiple Java sides without any problem, although the fewer Java sides talking to each .NET side, the better performance you will get.

Note that changing from shared memory to tcp/binary does not require any changes to your .NET or Java code. You can use the same binaries as before; you only need to change the configuration.

Conclusion

This lab has shown how you can write .NET-based Hadoop mapreducers without having to use Hadoop streaming or implement parsing of the stream. The .NET code can include as much or as little of the mapreducer functionality as you desire; the rest can be placed in Java-based wrappers. In the example we’ve worked through, the .NET code contains all of the mapreducer functionality except for the minimal functionality required for connectivity with the Hadoop framework itself. The .NET code can run in the same processes as the rest of the Hadoop application (in the case that Hadoop is running on a Windows cluster), or on different machines if Hadoop is running on a Linux cluster.

You can use the example code as a generalized pattern for creating the wrappers that connect the Hadoop framework to the .NET-based mapreducers. The code is simple and straightforward, and variants will work in most mapreduce scenarios.

You can enhance the provided code for additional scenarios. For example, if you want to use tcp/binary, you can modify the Java-side configuration (in class MapReducerHelper) so that any specific instance of the Java side can choose to connect to one of a set of .NET sides running on a cluster; the assignments do not have to be made by hand. You can also use the provided code to support tight integration of .NET mapreducers in a Hadoop application running on Windows Azure. This approach provides more flexibility and improved performance over the Hadoop streaming used by the Azure implementation.

We expect that the provided code and design patterns will be useful in many scenarios we haven’t even thought of. We’d love to hear your comments, suggestions, and feedback – you can contact us at labs@jnbridge.com.

You can download the source code for the lab here.