Main Content

Deploy Applications to Spark Using the MATLAB API for Spark

Supported Platform: Linux® only.

This example shows you how to deploy a standalone application to Spark™ using the MATLAB® API for Spark. Your application can be deployed against Spark using one of two supported cluster managers: local and Hadoop® YARN. This example shows you how to deploy your application using both cluster managers. For a discussion on cluster managers, see Cluster Managers Supported by Spark.

Goal: Count the number of unique airlines in the given dataset.

Dataset:airlinesmall.csv
Description:

Airline departure and arrival information from 1987-2008.

Location:/usr/local/MATLAB/R2024a/toolbox/matlab/demos

Helper Function

Create a MATLAB file named carrierToCount.m with the following code:

function results = carrierToCount(input)
    tbl = input{1};
    intermKeys = tbl.UniqueCarrier;
    [intermKeys, ~, idx] = unique(intermKeys);
    intermValues = num2cell(accumarray(idx, ones(size(idx))));
    results = cellfun( @(x,y) {x,y} , ...
        intermKeys, intermValues, ...
        'UniformOutput',false);
This helper function is passed in as a function handle to one of the methods in the example.

Note

If you are using Spark version 1.6 or higher, you will need to increase the Java® heap size in MATLAB to at least 512MB. For information on how to increase the Java heap size in MATLAB, see Java Heap Memory Preferences.

Local

A local cluster manager represents a pseudo Spark enabled cluster and works in a non-distributed mode on a single machine. It can be configured to use one worker thread, or on a multi-core machine, multiple worker threads. In applications, it is denoted by the word local. A local cluster manager is handy for debugging your application prior to full blown deployment on a Spark enabled Hadoop cluster.

Prerequisites

  1. Start this example by creating a new work folder that is visible to the MATLAB search path.

  2. Create the helper function carrierToCount.m mentioned above.

Procedure

  1. Specify Spark properties.

    Use a containers.Map object to specify Spark properties.

    sparkProp = containers.Map(...
        {'spark.executor.cores',...
        'spark.matlab.worker.debug'},...
        {'1',...
        'true'});

    Spark properties indicate the Spark execution environment of the application that is being deployed. Every application must be configured with specific Spark properties in order for it to be deployed.

    For more information on Spark properties, expand the prop value of the 'SparkProperties' name-value pair in the Input Arguments section of the SparkConf class.

  2. Create a SparkConf object.

    Use the class matlab.compiler.mlspark.SparkConf to create a SparkConf object. A SparkConf object stores the configuration parameters of the application being deployed to Spark. The configuration parameters of an application are passed onto a Spark cluster through a SparkContext.

    conf = matlab.compiler.mlspark.SparkConf(...
        'AppName', 'mySparkAppDepLocal', ...
        'Master', 'local[1]', ...
        'SparkProperties', sparkProp );

    For more information on SparkConf, see matlab.compiler.mlspark.SparkConf.

  3. Create a SparkContext object.

    Use the class matlab.compiler.mlspark.SparkContext with the SparkConf object as an input to create a SparkContext object.

    sc = matlab.compiler.mlspark.SparkContext(conf);

    A SparkContext object serves as an entry point to Spark by initializing a connection to a Spark cluster. It accepts a SparkConf object as an input argument and uses the parameters specified in that object to set up the internal services necessary to establish a connection to the Spark execution environment.

    For more information on SparkContext, see matlab.compiler.mlspark.SparkContext.

  4. Create an RDD object from the data.

    Use the MATLAB function datastore to create a datastore object pointing to the file airlinesmall.csv. Then use the SparkContext method datastoreToRDD to convert the datastore object to a Spark RDD object.

    % Create a MATLAB datastore (LOCAL)
    ds = datastore('airlinesmall.csv',...
        'TreatAsMissing','NA', ...
        'SelectedVariableNames','UniqueCarrier');
    % Convert MATLAB datastore to Spark RDD
    rdd = sc.datastoreToRDD(ds);

    In general, input RDDs can be created using the following methods of the SparkContext class: parallelize, datastoreToRDD, and textFile.

  5. Perform operations on the RDD object.

    Use a Spark RDD method such as flatMap to apply a function to all elements of the RDD object and flatten the results. The function carrierToCount that was created earlier serves as the function that is going to be applied to the elements of the RDD. A function handle to the function carrierToCount is passed as an input argument to the flatMap method.

    maprdd = rdd.flatMap(@carrierToCount);
    redrdd = maprdd.reduceByKey( @(acc,value) acc+value );
    countdata = redrdd.collect();
    
    % Count and display carrier occurrences
    count = 0;
    for i=1:numel(countdata)
        count = count + countdata{i}{2};
        fprintf('\nCarrier Name: %s, Count: %d', countdata{i}{1}, countdata{i}{2});
    end
    fprintf('\n Total count : %d\n', count);
    
    % Delete Spark Context
    delete(sc)

    In general, you will provide MATLAB functions handles or anonymous functions as input arguments to Spark RDD methods known as transformations and actions. These function handles and anonymous functions are executed on the workers of the deployed application.

    For a list of supported RDD transformations and actions, see Transformations and Actions in the Methods section of the RDD class.

    For more information on transformations and actions, see Apache Spark Basics.

  6. Create a standalone application.

    Use the mcc command with the -m flag to create a standalone application. The -m flag creates a standard executable that can be run from a command line. The -a flag includes the dependent dataset airlinesmall.csv from the folder <matlabroot>/toolbox/matlab/demos. The mcc command automatically picks up the dependent file carrierToCount.m as long as it is in the same work folder.

    >> mcc -m deployToSparkMlApiLocal.m -a <matlabroot>/toolbox/matlab/demos/airlinesmall.csv

    The mcc command creates a shell script run_deployToSparkMlApiLocal.sh to run the executable file deployToSparkMlApiLocal.

    For more information, see mcc.

  7. Run the standalone application from a Linux shell using the following command:

    $ ./run_deployToSparkMlApiLocal.sh /share/MATLAB/MATLAB_Runtime/v91

    /share/MATLAB/MATLAB_Runtime/v91 is an argument indicating the location of the MATLAB Runtime.

    Prior to executing the above command, make sure the javaclasspath.txt file is in the same folder as the shell script and the executable.

    Your application will fail to execute if it cannot find the file javaclasspath.txt.

    Your application may also fail to execute if the optional line containing the folder location of the Hadoop configuration files is uncommented. To execute your application on the local cluster manager, this line must be commented. This line should only be uncommented if you plan on running your application using yarn-client as your cluster manager on a Spark enabled Hadoop cluster.

  8. You will see the following output:

    Carrier Name: 9E, Count: 521
    Carrier Name: AA, Count: 14930
    Carrier Name: AQ, Count: 154
    Carrier Name: AS, Count: 2910
    Carrier Name: B6, Count: 806
    Carrier Name: CO, Count: 8138
    ...
    ...
    ...
    Carrier Name: US, Count: 13997
    Carrier Name: WN, Count: 15931
    Carrier Name: XE, Count: 2357
    Carrier Name: YV, Count: 849
     Total count : 123523

Code:

 deployToSparkMlApiLocal.m

Hadoop YARN

A yarn-client cluster manager represents a Spark enabled Hadoop cluster. A YARN cluster manager was introduced in Hadoop 2.0. It is typically installed on the same nodes as HDFS™. Therefore, running Spark on YARN lets Spark access HDFS data easily. In applications, it is denoted using the word yarn-client.

Since the steps for deploying your application using yarn-client as your cluster manager are similar to using the local cluster manager shown above, the steps are presented with minimal discussion. For a detailed discussion of each step, check the Local case above.

Note

You can follow the same instructions to deploy Spark applications created using the MATLAB API for Spark to CLOUDERA® CDH. To see an example on MATLAB Answers™, click here.

To use CLOUDERA CDH encryption zones, add the JAR file commons-codec-1.9.jar to the static classpath of MATLAB Runtime. Location of the file: $HADOOP_PREFIX/lib/commons-codec-1.9.jar, where $HADOOP_PREFIX is the location where Hadoop is installed.

Prerequisites

  1. Start this example by creating a new work folder that is visible to the MATLAB search path.

  2. Install the MATLAB Runtime in a folder that is accessible by every worker node in the Hadoop cluster. This example uses /share/MATLAB/MATLAB_Runtime/v91 as the location of the MATLAB Runtime folder.

    If you don’t have the MATLAB Runtime, you can download it from the website at: https://www.mathworks.com/products/compiler/mcr.

  3. Copy the airlinesmall.csv into Hadoop Distributed File System (HDFS) folder /user/<username>/datasets. Here <username> refers to your username in HDFS.

    $ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets

Procedure

  1. Set up the environment variable, HADOOP_PREFIX to point at your Hadoop install folder. These properties are necessary for submitting jobs to your Hadoop cluster.

    setenv('HADOOP_PREFIX','/share/hadoop/hadoop-2.6.0')

    The HADOOP_PREFIX environment variable must be set when using the MATLAB datastore function to point to data on HDFS. Setting this environment variable has nothing to do with Spark. See Relationship Between Spark and Hadoop for more information.

  2. Specify Spark properties.

    Use a containers.Map object to specify Spark properties.

    sparkProperties = containers.Map( ...
        {'spark.executor.cores',...
        'spark.executor.memory',...
        'spark.yarn.executor.memoryOverhead',...
        'spark.dynamicAllocation.enabled',...
        'spark.shuffle.service.enabled',...
        'spark.eventLog.enabled',...
        'spark.eventLog.dir'}, ...
        {'1',...
        '2g',...
        '1024',...
        'true',...
        'true',...
        'true',...
        'hdfs://hadoop01glnxa64:54310/user/<username>/sparkdeploy'});

    For more information on Spark properties, expand the prop value of the 'SparkProperties' name-value pair in the Input Arguments section of the SparkConf class.

  3. Create a SparkConf object.

    Use the class matlab.compiler.mlspark.SparkConf to create a SparkConf object.

    conf = matlab.compiler.mlspark.SparkConf( ...
        'AppName','myApp', ...
        'Master','yarn-client', ...
        'SparkProperties',sparkProperties);

    For more information on SparkConf, see matlab.compiler.mlspark.SparkConf.

  4. Create a SparkContext object.

    Use the class matlab.compiler.mlspark.SparkContext with the SparkConf object as an input to create a SparkContext object.

    sc = matlab.compiler.mlspark.SparkContext(conf);

    For more information on SparkContext, see matlab.compiler.mlspark.SparkContext.

  5. Create an RDD object from the data.

    Use the MATLAB function datastore to create a datastore object pointing to the file airlinesmall.csv in HDFS. Then use the SparkContext method datastoreToRDD to convert the datastore object to a Spark RDD object.

    % Create a MATLAB datastore (HADOOP)
    ds = datastore(...
        'hdfs:///user/<username>/datasets/airlinesmall.csv',...
        'TreatAsMissing','NA',...
        'SelectedVariableNames','UniqueCarrier');
    
    % Convert MATLAB datastore to Spark RDD
    rdd = sc.datastoreToRDD(ds);

    In general, input RDDs can be created using the following methods of the SparkContext class: parallelize, datastoreToRDD, and textFile.

  6. Perform operations on the RDD object.

    Use a Spark RDD method such as flatMap to apply a function to all elements of the RDD object and flatten the results. The function carrierToCount that was created earlier serves as the function that is going to be applied to the elements of the RDD. A function handle to the function carrierToCount is passed as an input argument to the flatMap method.

    maprdd = rdd.flatMap(@carrierToCount);
    redrdd = maprdd.reduceByKey( @(acc,value) acc+value );
    countdata = redrdd.collect();
    
    % Count and display carrier occurrences
    count = 0;
    for i=1:numel(countdata)
        count = count + countdata{i}{2};
        fprintf('\nCarrier Code: %s, Count: %d', countdata{i}{1}, countdata{i}{2});
    end
    fprintf('\n Total count : %d\n', count);
    
    % Save results to MAT file
    save('countdata.mat','countdata');
    
    % Delete Spark Context
    delete(sc);

    For a list of supported RDD transformations and actions, see Transformations and Actions in the Methods section of the RDD class.

    For more information on transformations and actions, see Apache Spark Basics.

  7. Create a standalone application.

    Use the mcc command with the -m flag to create a standalone application. The -m flag creates a standalone application that can be run from a command line. You do not need to attach the dataset airlinesmall.csv since it resides on HDFS. The mcc command automatically picks up the dependent file carrierToCount.m as long as it is in the same work folder.

    >> mcc -m deployToSparkMlApiHadoop.m

    The mcc command creates a shell script run_deployToSparkMlApiHadoop.sh to run the executable file deployToSparkMlApiHadoop.

    For more information, see mcc.

  8. Run the standalone application from a Linux shell using the following command:

    $ ./run_deployToSparkMlApiHadoop.sh /share/MATLAB/MATLAB_Runtime/v91

    /share/MATLAB/MATLAB_Runtime/v91 is an argument indicating the location of the MATLAB Runtime.

    Prior to executing the above command, make sure the javaclasspath.txt file is in the same folder as the shell script and the executable.

    Your application will fail to execute if it cannot find the file javaclasspath.txt.

    Your application may also fail to execute if the optional line containing the folder location of the Hadoop configuration files is commented. To execute your application on a yarn-client cluster manager, this line must be uncommented. This line should only be commented if you plan on running your application using a local cluster manager.

  9. You will see the following output:

    Carrier Name: 9E, Count: 521
    Carrier Name: AA, Count: 14930
    Carrier Name: AQ, Count: 154
    Carrier Name: AS, Count: 2910
    Carrier Name: B6, Count: 806
    Carrier Name: CO, Count: 8138
    ...
    ...
    ...
    Carrier Name: US, Count: 13997
    Carrier Name: WN, Count: 15931
    Carrier Name: XE, Count: 2357
    Carrier Name: YV, Count: 849
     Total count : 123523

    Note

    If the application being deployed is a MATLAB function as opposed to a MATLAB script, use the following execution syntax:

    $ ./run_<applicationName>.sh \
      <MATLAB_Runtime_Location> \
      [Spark arguments] \
      [Application arguments]
    For example:
    $ ./run_deployToSparkMlApiHadoop.sh.sh \
       /usr/local/MATLAB/MATLAB_Runtime/v91 \
       yarn-client \
       hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \
       hdfs://host:54310/user/<username>/result

Code:

 deployToSparkMlApiHadoop.m