Khalid Mammadov

Finding Min and Max FX Rates for every country using Hadoop MapReduce

In this article I will put together MapReduce program that scans data in HDFS file system and make data analysis. For this I have selected freely available data from

https://github.com/datasets/exchange-rates GitHub repository. This data set have all historic FX rates for many countries.

There are tree files in data folder :

daily.csv
monthly.csv
yearly.csv

and as their names implies each have Daily, Monthly and Yearly FX rates respectively.

Below is the snippet from files:

Daily

Date,Country,Value
1971-01-04,Australia,0.8987
1971-01-05,Australia,0.8983
1971-01-06,Australia,0.8977
1971-01-07,Australia,0.8978
1971-01-08,Australia,0.899
1971-01-11,Australia,0.8967
1971-01-12,Australia,0.8964
1971-01-13,Australia,0.8957
1971-01-14,Australia,0.8937

Monthly

Date,Country,Value
1971-01-01,Australia,0.8944
1971-02-01,Australia,0.8898
1971-03-01,Australia,0.8894
1971-04-01,Australia,0.8898
1971-05-01,Australia,0.8894
1971-06-01,Australia,0.8894
1971-07-01,Australia,0.8896
1971-08-01,Australia,0.8837
1971-09-01,Australia,0.8712

Yearly

Date,Country,Value
1971-01-01,Australia,0.8803
1972-01-01,Australia,0.8387
1973-01-01,Australia,0.7047
1974-01-01,Australia,0.695
1975-01-01,Australia,0.7647
1976-01-01,Australia,0.8187
1977-01-01,Australia,0.9024
1978-01-01,Australia,0.874
1979-01-01,Australia,0.8947

Goal

To find minimum and maximum registered FX rate for every country and for each year.

I am taking into account possibility missing data so, we will scan every file and look for data in each of them.

Data preparation

First thing first we will need to download and upload data into hadoop cluster. I am using my docker cluster that I have created in one of my previous articles.

So, lets download data:

cd ~
mkdir fxdata
cd fxdata
git clone https://github.com/datasets/exchange-rates.git

Upload data into hdfs:

cd data

#Create folder and Copy files
hadoop fs -cp -r exchange-rates /users/khalid/

#Verify
hadoop fs -ls /users/khalid/

Test data is ready now we can move and look at the program bit

Application

For my development I am using Eclipse IDE and Maven for dependency management. I have already developed the application and you can fork it in below git hub repo:

git clone https://github.com/khalidmammadov/hadoop_mapreduce_minmax

Algorithm

As any Map Reduce program it has two parts map and reduce. In map phase it reads data line by line and generate Key-Value pair for reducer. Here key will be joined Year and Country data e.g. 2010 France. And value will the FX rate.  Next Hadoop does nice job for us by sorting and partitioning this keys together and feeding to the reducer. In the reducer phase  it calculates minimum and maximum fx rates and submits it to the context with new Key-Value pairs for each result i.e. . 1971 Australia MIN.

Below is the full code:

package com.fx.minmaxfx;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class App {

    public static class ReaderMapper
    extends Mapper < Object, Text, Text, DoubleWritable > {

        private Text YearCountry = new Text();
        private final static String comma = ",";

        private DoubleWritable rate = new DoubleWritable();

        public void map(Object key, Text value, Context context) throws IOException,
        InterruptedException {

            try {
                //Split columns
                String[] columns = value.toString().split(comma);

                if (columns.length & amp; amp; amp; amp; amp; lt; 3 ||
                    columns[2] == null ||
                    columns[2].equals("Value")) {
                    return;
                }
                //Set FX rate
                rate.set(Double.parseDouble(columns[2]));

                //Construct key
                YearCountry.set(columns[0].substring(0, 4) + " " + columns[1]);

                //Submit value into the Context
                context.write(YearCountry, rate);
            } catch (NumberFormatException ex) {
                context.write(new Text("ERROR"), new DoubleWritable(0.0 d));
            }
        }
    }

    public static class MinMaxReducer
    extends Reducer < Text, DoubleWritable, Text, DoubleWritable > {
        private DoubleWritable minW = new DoubleWritable();
        private DoubleWritable maxW = new DoubleWritable();

        public void reduce(Text key, Iterable < DoubleWritable > values,
            Context context
        ) throws IOException,
        InterruptedException {
            double min = 9999999999999999 d;
            double max = 0;

            for (DoubleWritable val: values) {
                min = val.get() < min ? val.get() : min;
                max = val.get() > max ? val.get() : max;
            }
            minW.set(min);
            maxW.set(max);

            //Set key as Year Country Min/Max

            Text minKey = new Text(key.toString() + " " + "MIN");
            Text maxKey = new Text(key.toString() + " " + "MAX");

            context.write(minKey, minW);
            context.write(maxKey, maxW);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Min Max FX by YEAR");
        job.setJarByClass(App.class);
        job.setMapperClass(ReaderMapper.class);
        //job.setCombinerClass(MinMaxReducer.class);
        job.setReducerClass(MinMaxReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Compiling

As I said earlier I am using Maven for package and dependency management. Below is how to compile and package the code:

mvn compile

Output:

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building minmaxfx 0.0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ minmaxfx ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/khalid/hadoop_mapreduce_minmax/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ minmaxfx ---
[INFO] Nothing to compile - all classes are up to date
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.591 s
[INFO] Finished at: 2017-12-19T22:10:08Z
[INFO] Final Memory: 13M/319M
[INFO] ------------------------------------------------------------------------

Packaging

Now lets package the code and create jar archive which we will use for actual code execution.

mvn package


You should have similar to below output:

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building minmaxfx 0.0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ minmaxfx ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/khalid/hadoop_mapreduce_minmax/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ minmaxfx ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ minmaxfx ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/khalid/hadoop_mapreduce_minmax/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ minmaxfx ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ minmaxfx ---
[INFO] Surefire report directory: /home/khalid/hadoop_mapreduce_minmax/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.fx.minmaxfx.AppTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.008 sec

Results :

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ minmaxfx ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.037 s
[INFO] Finished at: 2017-12-19T22:13:46Z
[INFO] Final Memory: 16M/319M
[INFO] ------------------------------------------------------------------------

Execution

Now we can go and execute the code using jar file on hdfs cluster for our FX data set:

hadoop jar target/minmaxfx-0.0.1-SNAPSHOT.jar com.fx.minmaxfx.App \
/users/khalid/exchange-rates/data/ \
/users/khalid/out

After successful execution you should find part-r-00000 file in new “out” folder. Verify:

hadoop fs -ls /users/khalid/out

Lets now look to the output:

hadoop fs -cat /users/khalid/out11/part-r-00000| head -100

You should get something like this:

1971 Australia MIN	0.8412
1971 Australia MAX	0.899
1971 Austria MIN	23.638
1971 Austria MAX	25.873
1971 Belgium MIN	45.49
1971 Belgium MAX	49.73
1971 Canada MIN	0.9933
1971 Canada MAX	1.0248
1971 Denmark MIN	7.0665
1971 Denmark MAX	7.5067
1971 Finland MIN	4.1926
1971 Finland MAX	4.2154
1971 France MIN	5.3947
1971 France MAX	5.5332
1971 Germany MIN	3.2688
1971 Germany MAX	3.637
1971 Ireland MIN	0.3958
1971 Ireland MAX	0.4157
1971 Italy MIN	600.57
1971 Italy MAX	624.65
1971 Japan MIN	314.96
1971 Japan MAX	358.44
1971 Malaysia MIN	2.8885
1971 Malaysia MAX	3.0867
1971 Netherlands MIN	3.2784
1971 Netherlands MAX	3.6003
...

So, as you can see data is grouped by year and country and minimum, maximum rate registered for that year shown.

Summary

This short code shows how easy is to make calculations on huge amounts of data. I hope you liked this article.