How to Scale Native (C/C++) Applications on Pivotal's MPP Platform: Edge Detection Example, Part 1

December 11, 2014 Gautam Muralidhar

featured-scaling-trianglesJoint work performed by Gautam Muralidhar and Srivatsan Ramanujam.

One of the challenges organizations face while adopting big data platforms is seamlessly integrating legacy applications. Fear of having to redesign and reengineer existing applications built over years leads to resistance towards adopting big data platforms such as MPP databases and Apache Hadoop® within some organizations. Existing applications may have been developed in native programming languages such as C and C++, making it an expensive operation to re-engineer these applications to make them run on SQL- and Java-friendly big data platforms such as MPP databases and Apache Hadoop®. In this two-part blog post we’ll demonstrate how a sample native application can be seamlessly integrated and scaled up for data parallel problems on HAWQ, Pivotal’s SQL-on-Hadoop solution.

There are several ways of scaling native applications on Pivotal’s MPP platform. For instance, we could compile the native application into a PL/C user defined function (UDF), use the MADlib C++ abstraction layer, or one of the procedural languages like PL/Python, PL/R, or PL/Java, which support invoking native code. For Python, we could build a C extension to be invoked through PL/Python or simply use the Python ctypes library to invoke the native application via a PL/Python UDF. In this series, we will illustrate two approaches: using the ctypes library to invoke the native application through PL/Python UDFs, and directly compiling the native application into a PL/C UDF.

In part one, we will consider the task of edge detection, an important problem in computer vision often used as the building block for higher level tasks such as object recognition and tracking. We will show how a native application written in C++ can be scaled on Pivotal’s MPP platform through PL/Python, the least intrusive of all approaches. In part two, we will show how the same task can be achieved via PL/C. We’ll discuss the pros and cons of both approaches as well.

Sample Native Application in C++

The C++ application we consider here is an image processing application called Canny’s edge detection.

Edge detection is an image processing operation typically used during a feature computation step while building computer vision applications for tasks such as object recognition.

For example, the following is an image of a girl on the left, and an image depicting the detected edges on the right.

image02

(Source: Wikipedia – Edge Detection)

Such applications involve the processing of several thousand images to train machine learning models that can effectively learn to recognize objects from a large collection of images. However, computing edges and features from images poses an incredibly data-parallel problem, which can benefit by leveraging technologies such as HAWQ.

Our sample application uses OpenCV, a popular open source computer vision library that has a rich set of functionality for image processing and computer vision. The OpenCV library contains an implementation of Canny’s edge detection, which we will leverage in our application. This application was developed for use on data in smaller scales and will not scale to Big Data. The code snippet in Figure 1 illustrates our sample C++ application.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/*
* Gautam Muralidhar and Srivatsan Ramanujam, 28 Oct 2014
* C++ functions compiled into dynamic library to be invoked from PL/Python via ctypes.
* Canny Edge Detection from OpenCV.
*/
#include opencv2/opencv.hpp
#include opencv2/imgproc/imgproc.hpp
#include <stdio.h>
#include <string.h>
#include <iostream>
#include <vector>
using namespace cv;
using namespace std;
extern C {
// Find edges in images using Open CV’s implementation of Canny’s edge detection algorithm
// Inputs: a) char * bytes – the raw image byte stream, and b) uint nBytes – the number of bytes in the byte stream
// Output: an unsigned integer array of 1s amd 0s, where 1s denote edge locations.
// The length of the returned array = the number of image rows x number of image columns.
uint* edgeDetectionFromByteStream(char *bytes, uint nBytes){
Mat srcImg, srcGray;
Mat dstImg, onesImg, edges;
vector<unsigned char> src;
for (int i = 0; i < nBytes; i++) {
src.push_back(bytes[i]);
}
// Read the image from the buffer in memory using the OpenCV imdecode function
srcImg = imdecode(src, CV_LOAD_IMAGE_COLOR);
if(srcImg.data ) {
dstImg.create( srcImg.size(), CV_8UC1 );
onesImg.create( srcImg.size(), CV_8UC1 );
// Convert the input image to gray-scale using the OpenCV cvtColor function
cvtColor( srcImg, srcGray, COLOR_BGR2GRAY );
// Smooth the gray-scale image to reduce noise by using the OpenCV blur function.
blur(srcGray, edges, Size(3,3));
// Call the OpenCV Canny function to find edges.
Canny(edges, edges, 10, 30, 3 );
// Create an image of 1’s and 0’s, where 1 denotes an edge pixel.
dstImg = Scalar::all(0);
onesImg = Scalar::all(1);
onesImg.copyTo(dstImg,edges);
// Prepare the final result array
uint* result = new uint[dstImg.rows*dstImg.cols];
for (int i = 0; i < dstImg.rows; i++){
for (int j = 0; j < dstImg.cols; j++){
result[(edges.cols)*i+j] = uint(dstImg.at<unsigned char>(i,j));
}
}
return result;
} else {
// Return a single element 0 array if there is a problem loading the data
uint* result = new uint[1];
result[0] = 0;
return result;
}
}
}
extern C {
// Get the image size from the raw image byte stream.
// Inputs: a) char * bytes – the raw image byte stream, and b) uint nBytes – the number of bytes in the byte stream
// Output: an unsigned integer array comprising of two elements – number of rows and number of columns
uint* getImgSizeFromByteStream(char *bytes, uint nBytes){
// Declare the OpenCV source image matrix
Mat srcImg;
// Copy the input byte stream into a C++ vector of unsigned char
vector<unsigned char> src;
for (int i = 0; i < nBytes; i++) {
src.push_back(bytes[i]);
}
// Read the image from the buffer in memory using OpenCV’s imdecode function
srcImg = imdecode(src, CV_LOAD_IMAGE_COLOR);
uint* result = new uint[2];
result[0] = srcImg.rows;
result[1] = srcImg.cols;
return result;
}
}

Figure 1: An example C++ application for Canny edge detection

The main function of interest is edgeDetectionFromByteStream(), which takes a byte buffer that contains the raw image byte stream (for example, from a JPEG image file) and the number of bytes in the buffer as inputs. This function is exposed to the calling application and can be part of a bigger image processing workflow in which edge detection is one of the steps.

The edgeDetectionFromByteStream() function decodes the input image bytestream into an OpenCV Matrix type, converts the image to grayscale, blurs the image to remove noise, calls the OpenCV Canny function to find edges, and finally prepares an unsigned integer buffer comprising of ones and zeros (ones at edge locations detected by Canny’s algorithm, zeros elsewhere) to be returned to the caller. Additionally, we have also defined another function called getImgSizeFromByteStream(), which takes in an image byte stream and returns an unsigned integer array comprising of two elements: the number of rows, and the number of columns that make up the image. The image byte stream decoding, conversion to grayscale, image blurring, and Canny’s edge detection are all functionality provided by OpenCV, which we leverage.

The Canny edge detection application, while relatively simple in what it is accomplishing, embodies characteristics such as dependency on external libraries, which are typical of functions that constitute a larger C++ application.

We will next illustrate how our sample application can be run in HAWQ using the Python Ctypes library within a PL/Python UDF to process many images in parallel.

Large Scale Parallel Image Processing via PL/Python

PL/Python is the glue that binds the rich set of libraries in the PyData stack with the data residing in a database, to tackle data science problems. Python provides a lower barrier to entry for new developers and an incredible breadth of applications that can be written in it. Along with PL/R, it is very popular among data scientists at Pivotal. For an overview of PL/Python please refer to our talk at PyData, “Python Powered Data Science at Pivotal” (video, slides). Our choice of PL/Python for the first of the two approaches is due to the ease of invoking native libraries from Python with minimal code changes. One does not have to be a proficient C/C++ developer to scale a native application on Pivotal’s MPP if they are fluent in Python.

At a high level, running C++ native applications in HAWQ via PL/Python involve the following steps:

  1. Compiling the native application as a shared object or dynamic library.
  2. Installing the shared object and the dependent dynamic libraries (e.g., OpenCV) on all HAWQ segment nodes
  3. Creating a PL/Python driver UDF in HAWQ, which invokes the native function using Python Ctypes library
  4. Ingesting the image data into a HAWQ table and invoking the PL/Python driver on the image table in HAWQ.

We will next look at each of these steps in detail:

1) Compiling the Native Application as a Shared Object

To be able to invoke native applications in HAWQ, it is first necessary to compile the application as a dynamic library (.so file.) For example, on a CentOS system, the command illustrated in Figure 2 can be used to compile the Canny edge detection application as a shared object.

1
g++ -shared -Wl,-soname,canny_edge_detection -fPIC -ocanny_edge_detection.so -lopencv_core -lopencv_imgproc -lopencv_highgui CannyEdgeDetectionCtypes.cpp

Example 1 Building a C++ application into a shared object

As illustrated in Example 2, the source code for our example application is contained in the file CannyEdgeDetectionCtypes.cpp and the –l link option specifies the external OpenCV libraries to link to. Note that for the –l link option to work, the system’s LD_LIBRARY_PATH environment variable should include the location of the installed OpenCV libraries (typically, /usr/local/lib).

2) Installing the Dynamic Library and Its Dependencies on All HAWQ Segment Nodes

Once the shared object has been built, the next step is to install the shared object and the dependent libraries on all HAWQ segment nodes. This is achieved via the gpscp command as illustrated in Example 3.

1
gpscp -f hostfile canny_edge_detection_so libopencv_core.so.2.4 libopencv_imgproc.so.2.4 libopencv_highgui.so.2.4 =:/usr/local/lib/ds

Example 3: Installing the shared object and dependent libraries on all HAWQ segment nodes

The inputs to the gpscp command are the hostfile parameter: a file containing the host names of segments nodes in HAWQ (in our case the segments are named hdw1 to hdw16), the filenames of the shared objects to copy and the path of the destination directory on the segment nodes (in our case /usr/local/lib/ds). Now that our dynamic libraries have been distributed to all segment nodes, ensure that the environment variable LD_LIBRARY_PATH is updated to include the directory where we copied our files do and restart HAWQ. This can be achieved by adding the following to ~/.bashrc on all segment nodes

1
gpssh -f hostfile echo “export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib/ds” >> ~/.bashrc gpstop -r

3) Creating a PL/Python UDF to Invoke the C++ Application in HAWQ

Once the shared object and the dependent libraries have been distributed on the segment nodes, our C++ application can now be invoked in HAWQ via a PL/Python UDF. The code snippet in Example 4 illustrates this PL/Python UDF.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
————————————————————————————————————–
— Gautam Muralidhar, Srivatsan Ramanujam, Oct-2014
— PL/Python UDF, which calls the native Canny edge detection application
— Input: the raw image byte stream encoded as a comma-separated string and stored in a column in a HAWQ table
— Output: a composite type as defined in Figure 5.
————————————————————————————————————–
CREATE OR REPLACE FUNCTION canny_edge_detect(img varchar)
RETURNS canny_output_type
AS
$$
import ctypes
from array import array
from numpy.ctypeslib import ndpointer
libfile = /usr/local/lib/ds/canny_edge_detection.so
# Load the shared object of the native Canny edge detection application
# GD is a global dictionary supplied by PL/Python that is available to each user session
if(not GD.has_key(canny_edge_detection)):
GD[canny_edge_detection] = ctypes.cdll.LoadLibrary(libfile)
dl = GD[canny_edge_detection]
# Prepare to call the native getImgSizeFromByteStream function to determine the image size
getImgSize = dl.getImgSizeFromByteStream
# Specify the input argument types of the native function using ctypes mapping
getImgSize.argtypes = [ctypes.c_char_p, ctypes.c_uint]
# Specify the result type of the native function as an ndpointer from numpy.ctypeslib
getImgSize.restype = ndpointer(dtype=ctypes.c_uint,shape=(2,))
# Map the comma-separated string representing the image byte stream to a character array
buf = array(b, map(int, img.split(,))).tostring()
# Call the native getImgSizeFromByteStream function
imgSize = getImgSize(ctypes.c_char_p(buf),ctypes.c_uint(len(buf)) )
# Repeat above steps to call the native edgeDetectionFromByteStream function
edgeDetect = dl.edgeDetectionFromByteStream
edgeDetect.argtypes = [ctypes.c_char_p,ctypes.c_uint]
edgeDetect.restype = ndpointer(dtype=ctypes.c_uint,shape=(imgSize[0]*imgSize[1],))
result = edgeDetect(ctypes.c_char_p(buf),ctypes.c_uint(len(buf)))
# Return the composite type
return [imgSize[0], imgSize[1], result]
$$ LANGUAGE PLPYTHONU;

Example 4: PL/Python UDF for invoking the C++ application

The PL/Python UDF canny_edge_detect takes an image as input whose byte stream is encoded as a string (varchar). Our UDF uses the Python ctypes library to load the C++ shared object. Since our code is in C++ and the Python ctypes library is built for C, we enclose our native functions getImgSizeFromByteStream and edgeDetectionFromByteStream in an extern “C” block to prevent our C++ compiler from name mangling, as illustrated in Figure 1.

The UDF returns a composite type called canny_output_type, which is comprised of three fields: the number of rows in the image, the number of columns in the image, and the edge detection result as an integer array of length (number of rows x number of columns). The composite type creation is illustrated in Example 5.

1
2
3
4
5
6
7
create type canny_output_type
as
(
nrows int,
ncols int,
edges int[]
);

Example 5: Composite output type of the PL/Python UDF

4) Ingesting the Image Data into a HAWQ Table and Calling the PL/Python UDF

The image files (e.g., JPEG files) reside on HDFS. We ingest the raw image byte stream into HAWQ as follows:

  1. The images are first packed into a text file comprising of a tab-separated key-value pair, where the key is the image name on HDFS and the value is the image byte stream encoded as a comma-separated string.
  2. The text file is generated using a Apache Hadoop® map job, with the mapper class as illustrated in Example 6. While there are more efficient ways of persisting the image file on HDFS, we chose this simpler approach to illustrate our main goal.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static class ImageToSequenceMapper extends Mapper<LongWritable,Text,Text, Text> {
// The input to each mapper is a set of lines from a text file on HDFS, where each line contains a path to an HDFS image file
// The job input format is NLineInputFormat and the job output format is TextOutputFormat
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// The input argument value contains the HDFS path to the image file
String imgName = value.toString();
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream in = null;
// Create an input stream to the image file on HDFS and read the file into a byte array in memory
in = fs.open(new Path(imgName));
byte[] buffer = new byte[in.available()];
in.readFully(0,buffer,0,in.available());
// Convert the byte array to a comma separated string
String byteString = Arrays.toString(buffer);
byteString = byteString.replace([,).replace(“]”,””);
// Write the image name and the image byte string as a tab-separated key-value pair to the job context
context.write(value, new Text(byteString));
// Close the input stream
IOUtils.closeStream(in);
}
}

Example 6: Mapper class for generating a text file comprising of a tab-separated key-value pair, where the key is the image name on HDFS and the value is the image byte stream encoded as a comma-separated string

Once the text file has been generated on HDFS, we can ingest the image data into a HAWQ external table via PXF, as illustrated in Example 7. Essentially, we are storing the image name and comma separated string representing the image byte stream as a column in a HAWQ table. This approach works as long as the image size is less than 1 GB, which is usually the case in many applications (e.g., even high resolution Mammograms in healthcare are around 5-10 MB in size). Applications requiring to process images whose size is larger than 1GB can leverage the in-database image processing approach—described in one our earlier blogs about in-database image processing.

For better performance of data-parallel workloads, a regular HAWQ table is then created from the external table with the image name specified as an explicit distribution key. This is illustrated in Example 7 as well.

1
2
3
4
5
6
7
8
9
10
11
create external table ocv.src_image_ext (
img_name varchar,
img varchar
)
LOCATION(pxf://hdm1:50070/user/user-name/opencvexample/imgseqfile/part-r-00000?Profile=HdfsTextSimple)
FORMAT TEXT (delimiter = Et);
create table ocv.src_image as
(select * from ocv.src_image_ext)
DISTRIBUTED BY (img_name);

Example 7: Data ingestion into HAWQ

Once the data is ingested into HAWQ, the PL/Python UDF is invoked as a normal SQL command as illustrated in Figure 8. The elements of the composite type returned by the PL/Python UDF are stored as individual columns in the canny_edge_table in HAWQ.

1
2
3
4
5
6
7
create table ocv.canny_edge_table
as
(
select img_name,
(canny_edge_detect(img)).*
from ocv.src_image
);

Example 8: Invoking the PL/Python UDF

5. Displaying the Results

Finally, the results of our edge detection application can be checked via the convenient Pandas-via-psql command line tool, which was developed here at Pivotal. If you have Anaconda Python, you can simply run the following command to install this visualization utility.

1
pip install ppsqlviz

Figures 9 and 10 illustrate the pandas-via-psql command and the result of our edge detection application on a sample image.

1
psql -d <dbname> -h <HAWQ master hostname> -U <username> -c select nrows, ncols, edges from ocv.canny_edge_table limit 1; | python -m ‘ppsqlviz.plotter’ image

Figure 2: Displaying the result using pandas-via-psql utility

image00

Figure 3: Example edge detection result

image01

With the image edges available in HAWQ, we can now proceed in a similar manner with other steps of a computer vision workflow (e.g., object recognition workflow), such as feature computation and machine learning in HAWQ.

Pros and Cons : PL/Python UDFs and Ctypes

A huge advantage of integrating native C++ applications via PL/Python UDFs in HAWQ is that it enables integration of existing native applications with literally no code change to how function parameters and data are passed to and from native functions. In many scenarios, an engineer’s time is more valuable than the time spent by a system executing the application. Given the ease of use and the flexibility of a language like Python, we can easily port the native code to MPP and run it at scale.

In PL/Python, the Python interpreter runs within the native Greenplum/PostGreSQL process while executing a query. In doing so it brings the power and flexibility of Python to a querying language like SQL with minimal overhead. However, dynamically typed and interpreted languages like Python are quite slower compared to compiled, strongly typed languages like C and C++. In our sample tests, PL/Python and ctypes approach took 158 seconds to run edge detection on a small distributed dataset of 907 images on a 16-node HAWQ cluster. We did not quite achieve linear scalability, as this is not an apples to apples comparison (C++ vs. HDFS disk reads with SQL invoking Python Ctypes invoking C++), but it illustrates how native apps can continue to work on MPP with instant performance boost while the engineering teams can gradually port their code to be MPP native.

To summarize, in this post we introduced the task of edge detection and showed a comprehensive example of how a native application written in C++ was scaled on Pivotal’s MPP platform through PL/Python. In part two we will demonstrate how the same task can be achieved via PL/C.

Editor’s Note: Apache, Apache Hadoop, Hadoop, and the yellow elephant logo are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.

About the Author

Biography

Previous
Distributed Deep Learning on MPP and Hadoop
Distributed Deep Learning on MPP and Hadoop

Deep learning has become a more popular approach to machine learning that has shown to provide significant ...

Next
Getting Started with WordPress on Cloud Foundry
Getting Started with WordPress on Cloud Foundry

In this post, Pivotal engineer, Daniel Mikusa, explains how you can begin to use Wordpress on Cloud Foundry...