Twister4Azure Kmeans User Guide
Latest version at
http://salsaproj.indiana.edu/Twister4Azure.html  
 

SALSA Group
PTI Indiana University
July 20th 2014

Contents

  1. Introduction
  2. Execution Guide for Twister4Azure-Kmeans
    1. Prerequisites
    2. Related Files
    3. Running KMeans Clustering Sample Locally
    4. Running KMeans Clustering Sample on Azure
  1. Development Guide for Twister4Azure-Kmeans
    1. Prerequisites
    2. Related Files
    3. Code Analysis

1. Introduction

In statistics and machine learning, kmeans clustering is a method of cluster analysis which aims to partition n observation into k clusters where each observation belongs to the cluster with the nearest mean [ wikipedia ].

In twister4azure Kmeans we first split the input data into smaller fragments and upload these fragments to a container in the azure storage.We also give the initial cluster centers as a broadcast to all workers.Generally data that does not change ( ex: data points) across iterations is given as input and that changes is given as broadcast(dynamic data , ex : cluster centers).

Then we give the container as an input to the kmeans program , where each map task picks up a file from the container.Each map computes a partial cluster centers by going through the data points of their file. A reduce task computes the average of the partial cluster centers and produce the cluster centers for the next step. Merge task, once it gets these new cluster centers, determines if it needs to execute another cycle of MapReduce computation and if so braodcasts the new cluster centers to the next iteration.
 

https://googledrive.com/host/0Bw6ZC9ZjtNobeU9oekpOd0R2Y3M/Drawing1.PNG

Figure 1. The workflow of kmeans application with Twister4Azure MapReduce framework

2. Execution Guide for Twister4Azure-Kmeans

  1. Prerequisite

Make sure you have completed following steps before proceeding with the execution of the Twister4Azure-Kmeans application.

    1. Visual Studio 2012
    2. Azure SDK latest version of Twister4Azure source code supports version 1.7 of the Azure SDK.(First install all components of azure version 2.1 (Azuretools , WindowsAzureAuthoringTools , Azure emulator , WindowsAzureLibsForNet , WindowsAzureTools.vs110 , WindowsAzureStorageTools) and install version 1.7 SDk too (Azure Tools,WindowsAzureAuthoringTools,WindowsAzureLibsForNet,WindowsAzureTools.vs110,WindowsAzureTools.LightSwitch.vs110) )
    3. Download the latest release of Twister4Azure and unzip it.

 

  1. Input

The set of input files for this application are located under the folder of downloaded Twister4Azure\Samples\KMeansClustering\SampleData . This contains 2 files “kminput.zip” and “kmcenters.zip” or follow the links in the later section.

 

  1. Running KMeans Clustering Sample Locally
    1. Start Visual Studio 2012 as administrator and open the “Twister4Azure” solution.It asks for a upgrade or download , select the upgrade to azure tools v2.1 click ok.
    2. Expand the Twister4AzureCloud project and expand the roles.Right click on the Twister4AzureUI Webrole and remove it.Now right click on the Twister4Azurecloud solution and set as start-up project again. Now run it (F5).
    3. Create a blob containers named “kminput” in your local azure storage.  Download and unzip kminput.zip. Upload the contents of the unzipped “kminput” folder to the “kminput” container. You can use a freely available third party Azure storage client (eg: CloudBerry explorer for Azure Blob Storage) to perform the above.
    4. Create a blob containers named “kmcenters”.  Download and unzip kmcenters.zip. Upload the contents of the unzipped “kmcenters” folder to the “kmcenters” container.
    5. Open “SampleClients” solution in a different Visual studio instance. Make sure “KMeansMRClient” project is selected as the startup project. Go to the project properties of “KMeansMRClient” project and open the Debug tab. Specify the following in the “Command line arguments” text area. The arguments for KMeans Client are,
o   <jobID(needs to be unique across different runs)> <inputContainerName>
o          <outContainerName> <NumberOfReduceTasks> <cluster centers> 
o          <vector length>
Eg: test1  kminput kmoutput 1 kmcenters/centers_400_20 20
    1. Run the client (F5). You’ll be able to monitor the computation using the Twister4Azure monitoring console, which will open in a new browser window.

 

  1. Executing Kmeans Clustering on Azure
    1. Open "TwisterAzure" solution.
    2. Expand the Twister4AzureCloud project and expand the roles.Right click on the Twister4AzureUI Webrole and remove it (If you have it.).
    3. Double click on the “Twister4AzureWorker” Role under “Roles” in the "Twister4AzureCloud" project. Go to “Settings” tab. Click on “DataConnectionString” setting and click on “...” in the value and select "Enter storage account credentials". Enter your azure storage account credentials
      http://salsahpc.indiana.edu/twister4azure/docs-images/storage-cred.png
    4. Do the same for "DiagnosticsConnectionString" and "Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString". Make sure you perform this configuration for the Worker Role (Twister4AzureWorker).
    5. Follow these tutorials from MSDN to deploy Azure Applications directly from Visual Studio.
    6. Upload the sample data to your Azure storage account similarly to the way we upload them to local storage.
    7. Open “SampleClients” solution in a different Visual studio instance. Make sure “KMeansMRClient” project is selected as the startup project. Go to the project properties of “KMeansMRClient” project and open the Debug tab. Specify the following in the “Command line arguments” text area. The arguments for KMeans Client are,
o   <jobID(needs to be unique across different runs)> <inputContainerName>
o          <outContainerName> <NumberOfReduceTasks> <cluster centers> 
o          <vector length>
Eg: test1  kminput kmoutput 1 kmcenters/centers_400_20 20
    1. https://googledrive.com/host/0Bw6ZC9ZjtNobeU9oekpOd0R2Y3M/Credentials.png

      Open the "ClientCredentials.cs" in the "Credentials" project. Comment the return statement( return Cloud....Develo..;). Uncomment block comment above the return statement and specify the details of your Azure Storage Account.

 

Account is name of the storage account and key is the access key for your account.Go to the azureportal-->storage accounts and click on your storage account then click manage access keys , this is where you can find the accountname and the key. On the top tab click the dashboard and you can find the URLs there for blob,table ..

    1. Run the client (F5).

 

3. Development Guide for Twister4Azure-Kmeans

The following sections illustrate how to write kmeans application with Twister4Azure MapReduce framework. The kmeans application is little more complex than the Word Count application. It requires multiple iterations during MapReduce computation. Also, it employs both constant data and variable data during the iterations. Here we describe how to write iterative MapReduce application in Twister4azure and the API used to schedule a job for that application specifically kmeans.

Each map task in the Twister4azure Kmeans application gets a set of data points. These data points does not change over the course of iterations and Map tasks access them in each iteration. Invariant nature of this data mark them as static in Twister4Azure. Thus, particular points are loaded only once for the entire execution. The cluster centers computed in each iteration are dynamic data as they tend to change over iterations. Also, the calculated cluster centers of a particular iteration serve as the input for the next iteration.

  1. Prerequisites

 

  1. Related Code Files

The source code for Twister4Azure-Kmeans is located in $TWISTER4Azure_HOME\Samples\KMeansClustering\KMeansWorker . The source files are analysed in the later sections.

  1. Code Analysis
         IV.      
          V.                     
         VI.      protected override int Map(IntKey key, MatrixValue<double> matrixValue, List<KeyValuePair<IntKey, MatrixValue<double>>> dynamicData, 
        VII.      IOutputCollector<IntKey, DoubleVectorValue> outputCollector, string programArgs)
      VIII.             {
         IX.                 double[][] centroids = dynamicData[0].Value.Matrix;
          X.                 int vectorLength = dynamicData[0].Value.Width;
         XI.      
        XII.                 double[][] newCentroids = GetNewCentroids(centroids, vectorLength, matrixValue);
      XIII.      
        XIV.      
         XV.                 for (int i = 0; i < newCentroids.Length; i++)
        XVI.                 {
      XVII.                     outputCollector.Collect(IntKey.GetInstance(i), DoubleVectorValue.GetInstance(newCentroids[i]));
     XVIII.                 }
        XIX.      
         XX.                 return 0;
        XXI.             }
      XXII.             
     XXIII.              private double[][] GetNewCentroids(double[][] centroids, int vectorLength, MatrixValue<double> matrixValue)
      XXIV.             {
        XXV.                 double[][] newCentroids = new double[centroids.Length][];
      XXVI.      
     XXVII.                 for (int i = 0; i < newCentroids.Length; i++)
   XXVIII.                 {
      XXIX.                     newCentroids[i] = new double[vectorLength + 1];
        XXX.                 }
      XXXI.      
     XXXII.                 double[][] valueMatrix = matrixValue.Matrix;
   XXXIII.                 int numCentroids = centroids.Length;
     XXXIV.      
      XXXV.                 foreach (double[] t in valueMatrix)
     XXXVI.                 {
   XXXVII.                     double min = 0;
  XXXVIII.                     int minCentroid = 0;
     XXXIX.                     for (int j = 0; j < numCentroids; j++)
         XL.                     {
        XLI.                         double euDistance = 0;
      XLII.                         for (int i = 0; i < vectorLength; i++)
     XLIII.                         {
      XLIV.                             double distance = (centroids[j][i] - t[i]);
        XLV.                             euDistance += distance*distance;
      XLVI.                         }
     XLVII.      
   XLVIII.                         if (j == 0)
      XLIX.                         {
          L.                             min = euDistance;
         LI.                         }
        LII.                         else if (euDistance < min)
      LIII.                         {
        LIV.                             min = euDistance;
         LV.                             minCentroid = j;
        LVI.                         }
      LVII.                     }
     LVIII.                     for (int i = 0; i < vectorLength; i++)
        LIX.                     {
         LX.                         newCentroids[minCentroid][i] += t[i];
        LXI.                     }
      LXII.                     newCentroids[minCentroid][vectorLength] += 1;
     LXIII.                 }
      LXIV.                 return newCentroids;
        LXV.             }
                

Line 2:

Signature for the map method is <INKEY, INVALUE, OUTKEY, OUTVALUE, BCASTINKEY, BCASTINVALUE> where
INKEY is the data type of the key for the input to the map task.
INVALUE datatype of the value for input to map and similarly OUT represents the datatype for output of map tasks.
BCAST variables represents the datatype for the dynamicdata (cluster centers in this case).

Line 4-5:

This is where we initialize the cluster center matrix with the centers from the dynamic data (centers for the current iteration)

Line 8:

Here we caluclate the partial centers that is centers according to the data that each map task has.

Line 11-14:

Here we collect the partial centers to the outputcollector.So the outputcollector has at the end of all map tasks all the partial centers.

Line 11-14:

Here we collect the partial centers to the outputcollector.so the outputcollector has at the end of all map tasks all the partial centers.

Line 19-61:

This is the code to caluclate the partial centers.this is just given for information.

 

 
  public override int Reduce(IntKey key, List<DoubleVectorValue> values,
   IOutputCollector<IntKey, DoubleVectorValue> outputCollector, string programArgs)
        {
            double[] newCentroid = generateNewCentroid(programArgs, values);
 
            DoubleVectorValue outValue = DoubleVectorValue.GetInstance(newCentroid);
            outputCollector.Collect(key, outValue);
            return 0;
        }
                

In the KmeansReducer we get the partial centers from all the maps in the variable values and we take average of all the centers and create the new cluster centers for the next iteration.

 

 
  double[][] output = new double[dynamicData.First().Value.BlockHeight][];
            foreach (var value in values)
            {
                output[value.Key.Key] = value.Value.First().Value;
            }
            MatrixValue<double> matrixValue = MatrixValue<double>.GetInstance(output,
                                                                              dynamicData.First().Value.BlockHeight,
                                                                              dynamicData.First().Value.Width, 0, 0);
 
            int numIterations = 10;
            string configString = RoleEnvironment.GetConfigurationSettingValue("KMeansNumIterations");
            if (configString != null & configString != "")
            {
                numIterations = Int32.Parse(configString);
            }
            collector.Collect(IntKey.GetInstance(0), matrixValue);
            addIteration = Int32.Parse(reduceContext.ReduceTask.PartitionKey) < (numIterations - 1);
            return 0;
                   

The kmeans meger gets the new cluster centers and the passes it to next iteration and does the iteration check.

 

 
        jobConf jobConf = new  public KmeansMR()
        {
            DirectTCPTranfer = 2;
        }
 
        public override bool ConfigMapRed(MRConf mrConf, CloudStorageAccount storageAccount, out object configOut)
        {
            configOut = null;
            return true;
        }
 
        public override string Name
        {
            get { return "KMeansMR"; }
        }
 
        public override Mapper < IntKey, MatrixValue<double>, IntKey, DoubleVectorValue, IntKey, MatrixValue<double>>
            MapperType
        {
            get { return new KmeansMatrixMapper(); }
        }
 
        public override Reducer<IntKey, DoubleVectorValue, IntKey, DoubleVectorValue> ReducerType
        {
            get { return new KmeansReducer(); }
        }
 
        public override Merger<IntKey, DoubleVectorValue, IntKey, MatrixValue<double>, IntKey, MatrixValue<double>>
            MergeType
        {
            get { return new KMeansMerge(); }
        }
 
        public override IInputFormat<IntKey, MatrixValue<double>> InputFormatType
        {
            get { return new CachedDoubleMatrixFormat(); }
        }
 
        public override IOutputFormat<IntKey, DoubleVectorValue> MapOutputFormatType
        {
            get { return new DoubleVectorOutputFormat(); }
        }
 
        public override IOutputFormat <IntKey, DoubleVectorValue> ReduceOutputFormatType
        {
            get { return new DoubleVectorOutputFormat(); }
        }
 
        public override IOutputFormat <IntKey, MatrixValue<double>> MergeOutputFormatType
        {
            get { return new DoubleMatrixOutputFormat(); }
        }
 
        public override IPartitioner PartitionerType
        {
            get { return new HashPartitioner(); }
        }
                

Line 3:

Sets the datatransfer to direct TCP , 0 for using just blobs ,1 for hybrid.

Line 17-32:

Returns the mapper,reducer and merger we have discussed above.

Line 34-57:

Sets the datatypes for inputs and outputs of the map,reduce and merge tasks.

 

 
                  
            var twisterAzureClient = new TwisterAzureClient(storageAccount, jobID, mapQueue,
                                                            reduceQueue);
            twisterAzureClient.ProcessMapRed("KMeansMR", inputContainer, 0, vectorLength, numReduceTasks,
                                             outputContainer, broadcastURI, true);
                  

Line 2-3 :

storage account is the name of the azure storage , jobid - name for the job , mapQueue - name of the map scheduling queue used in the twister4azure worker role , reduceQueue = name of the reduce scheduling queue.

Line 4-5 :

"KmeansMR" is the name of the application that you specify in the driver , inputcontainer is the name of the container that has the input files in the azure storage blob , 0 is the iteration number , vectorlength is the dimension of the data , numReducetasks is the number of reduce tasks , outcontainer is the azure storage container where you want any output data to get dumped , broadcastURI is the path that has the centroid file (Containername\filename) , true tells us whether to perform merge.