NAG and Cloudera™
NAG and Cloudera™
This document describes how to use the NAG Library using the Spark programming model. The instructions given here are specific to the Cloudera™ environment and several test problems are also discussed.
For more information on using the NAG Library in Spark or any of the other topics touched upon in this article please contact us.
General Guidelines for Calling the NAG Library From Spark
The fundamental datatype used in Spark is the Resilient Distributed Dataset (RDD). An RDD acts as a pointer to your distributed data on the filesystem. This object has intuitive methods (count, sample, filter, map/reduce, etc.) and lazy evaluation that allow for fast and easy manipulation of distributed data.
Below is a simple Python example of using the NAG Library in Spark to calculate the cumulative Normal distribution function on a set of numbers (the message passing output from Spark has been omitted):
SparkContext available as sc
>>> from nag4py.s import nag_cumul_normal >>> myNumbers = sc.parallelize( [-2.0, -1.0, 0.0, 1.0, 2.0] )
>>> myNumbers.takeSample(False, 5, 0)
[ 0.0, -2.0, -1.0, 1.0, 2.0]
>>> myNumbers.map( nag_cumul_normal ).takeSample(False, 5, 0) [0.5, 0.02275, 0.15865, 0.84134, .97725]
It should be noted that the vast majority of the algorithms employed in the NAG Library require all relevant data to be held in memory. This may seem to deviate from the Spark ecosystem, however when working with large datasets, two usage scenarios are commonly seen:
- The full dataset is split into subsets, for example a dataset covering the whole world may be split by country, county and city and an independent analysis carried out on each subset. In such cases all the relevant data for a given analysis may be held on a single node and therefore can be processed directly by NAG Library functions.
- A single analysis is required that utilizes the full dataset. In this case it is sometimes possible to reformulate the problem. For example, many statistical techniques can be reformulated as a maximum likelihood optimization problem. The objective function of such an optimization (the likelihood) can then be evaluated using the standard Spark map/reduce functions and the results fed back to one of the robust optimization routines available in the NAG Library.
NAG Library Installation in a Cloudera Environment
- Download the NAG Library to be used.
- Obtain a valid NAG KUSARI license key from NAG. At present, this needs to be a non-node locked license and will be provided by NAG on a case-by-case basis.
- Upload the NAG Library and license files to the Cloudera cluster to be used. Place the files in a directory that users have access to and place the files in the same location on ALL nodes in the cluster.
Running a Spark Job using the NAG Library in a Cloudera Environment
- Compile and prepare your application. For the Java cases presented here sbt was used with a standard layout of the directory structure as described in the Spark documentation. For python, compilation is not necessary.
- Applications utilizing the NAG Library and Cloudera can be written using Java or Python. To date, NAG has been tested in a Cloudera environment using Spark and applications are run using the "spark-submit" command. If the application is written using Java, the locations of the NAG Java and Fortran libraries must be specified, the environment variable, "NAG_KUSARI_FILE" must also be set correctly, and the NAG ".jar" wrapper file containing the NAG Java interfaces must be specified. An example spark-submit command is given below. Of course, the exact paths will be different for your system and must be set accordingly.
spark-submit \ --master yarn \ --deploy-mode cluster \ --executor-cores 1 \ --num-executors 2 \ --class "logisticRegressionSpark2" \ --jars /opt/nag/NAG-1.0/NAGJava/jar/NAGJava.jar \ --conf "spark.yarn.appMasterEnv.NAG_KUSARI_FILE= \ /opt/nag/NAG-1.0/nag.key" \ --conf "spark.driver.extraLibraryPath= \ /opt/nag/NAG-1.0/NAGJava/linux_x64/: \ /opt/nag/NAG-1.0/fll6i24dcl/lib/: \ /opt/nag/NAG-1.0/fll6i24dcl/rtl/" \ LOGIT2/target/scala-2.10/logit2_2.10-1.0.jar 45211 1
To use the NAG Python wrappers, the NAG C Library must be added to the Library path and the path to the location of the NAG Python wrapper ".egg" file must be specified. In order to the use the NAG Python wrappers you will also need the "numpy" and "matplotlib" packages installed on each node of the cluster. Note that the NAG Library or the Python ".egg" files do not need to be installed in a particular location and were installed in the home directory of user "nag" for this example.
spark-submit \ --master yarn \ --deploy-mode cluster \ --executor-cores 1 \ --num-executors 2 \ --py-files "/home/nag/nag4py-23.0/dist/ \ nag4py-23.0-py2.7.egg" \ --conf "spark.yarn.appMasterEnv.NAG_KUSARI_FILE= \ /home/nag/nag.key" \ --conf "spark.driver.extraLibraryPath= \ /home/nag/cll6i23dcl/lib/" \ log_regression_spark_v4.py model 45211 1
All tests were run using a cluster provided to NAG by Cloudera. The cluster is running in Cloudera's Mountain View data center on its own VPN. It consists of six HP DL380 gen9 servers and two HP DL360 gen9 servers. The worker nodes have 27TB of storage, 250GB of RAM, and 20 cores (40 with hyperthreading). Kerberos is enabled on the cluster using an Active Directory Server as the KDC. Centrify is also installed on the cluster for user synchronization.
The first test solving a logistic regression of data was solved using the Apache Spark API and the NAG Library. The regression was solved by minimizing the sum of least squares of the log likelihood function for all data points in the data set. The NAG function for minimization used was e04uc (nag_opt_nlp) which is designed to minimize an arbitrary smooth function subject to constraints (which may include simple bounds on the variables, linear constraints and smooth nonlinear constraints) using a sequential quadratic programming (SQP) method. First derivatives can be supplied to this function to improve its convergence properties. The function e04uc may also be used for unconstrained, bound-constrained and linearly constrained optimization. The function e04uc uses forward communication for evaluating the objective function, the nonlinear constraint functions and any of their derivatives.
The data set used for the first test was very small and is data from Argentinian bank loan default records. Three independent variables are considered in the regression analysis and are person's age, balance of the loan and loan duration. The dependent variable is 1 if the loan is in default and 0 if the loan is not in default. A total of 45,211 records are in the data set. Both the Python and Java examples were run as described above.
The second test again solved a logistic regression problem as described above, but with a different data set. In this case, the data set came from the U.S. Department of Transportation, Bureau of Transportation Statistics. The data set consisted of on-time data for all U.S. flight records from 1987 to 2008. In this case, the dependent variable indicates if a flight was diverted (0 - no; 1 - yes) and the independent variables selected were carrier delay, weather delay, and security delay, where delay refers to the amount of time the flight was delayed due to the listed cause and are measured in minutes. Again, the regression analysis was run using the Java and Python codes.
The final test used the same logistic regression code as TEST 1 and TEST 2. This time a data set was constructed consisting of a dependent variable that is randomly chosen to be 0 or 1. Three independent variables were chosen for each dependent variable by randomly sampling a Normal distribution centered on a chosen logit function. This enabled the creation of a much larger data set than were used in TEST 1 and TEST 2. The test was used to obtain scaling results of the Java code running on the supplied cluster. The scaling results are shown in the figure below. Note that scaling is reasonable until the number of processing elements is equal to 40. For these cases the number of executor cores was varied up to 40 and simulation was run on one node. After that, executors were added and internode communication seems to become more of a factor.
Test 4 The Linear Regression Problem
In this test we test the scalability and performance of using the NAG Library for Java to solve a large-scale multi-linear regression problem on Spark. The example data ranges from 2 GB up to 64 GB in the form of
We solve this problem using the Normal equations . This method allows us to map the sum-of-squares matrix computation across worker nodes. The reduce phase of Spark aggregates two of these matrices together. In the final step, a NAG linear regression routine is called on the master node to calculate the regression coefficients. All of this happens in one pass over the data – no iterative methods needed!
Example output from the linear regression is as follows:
Time taken for NAG analysis: 236.964 seconds
Number of Independent Variables: 5 Total Number of Points: 334800000 R-Squared = 0.699
Predicting 4 points
Prediction: 57634.9 Actual: 60293.6
Prediction: 32746.6 Actual: 35155.5
Prediction: 49917.5 Actual: 52085.3
Prediction: 82413.2 Actual: 82900.3
Obtaining help with NAG and Cloudera:
All NAG documentation is accessible from https://www.nag.co.uk/content/software-documentation.
To contact your nearest NAG Technical Support Service please click here.
NAG is available to help implement your algorithm(s) within a Cloudera environment. Customer needs will be discussed on a case-by-case basis and may require consultation between NAG, Cloudera and the customer using the Cloudera system. In addition, we will endeavour to work with both Cloudera and any of our customers using the NAG Library within a Cloudera environment to resolve any problems with our software or its use and configuration within a Cloudera environment.