Wednesday, February 13, 2013

HA and DR in Storm


Storm seems to fault tolerant with below mentioned details :

 
 HA for a worker failure
 
In case of worker failure, Supervisor would restart it. But if it fails contineously and unable to send hearbeat to Nimbus, Numbus would assign to another node.
 
 HA for a Node failure
 
In this case, one of the slave node is down and there would be timeout and Nimbus would reassign it to another node.
 
HA for a Nimbus + Supervisor Deamons
 
The Nimbus and Supervisor daemons are designed to be fail-fast and stateless. This states are kept in Zookeeper or on disk). It is said that this daemons run under supervision. Now this the key, as under "supervision" means, we  need to configure tools likemonit and Upstart to restart the daemons seamlessly.
 
Nimbus
 
If we loose Nimbus node, workers would  still continue to work and supervisors would continue to restart workers if they die. But without Nimbus, workers wont be re-assigned to other machine in case the worker machine also dies.
 
Now according to Nathan
"There are plans to make Nimbus highly available in the future" as of Jan-17,2012
 
 
 
Now We can think of :
 
1. Zookeeper ensemble to take care of RACK failure
2. Zookeeper Observer to do Cross Data Center DR (http://zookeeper.apache.org/doc/trunk/zookeeperObservers.html)




 
 
This comes with a caveat that it requires experimentation and feasibility study of a production ready deployment.
 
But this things need  to be experimented and not available out of the box as of now.
I am also following up with Nathan (Storm creator) and storm user group to understand any work around.
Good point is there are many companies still using Storm in production.
 
Few pointers:


Wednesday, April 25, 2012

Pig Performance Measures


Performance tuning for pig script


       1. Avoid splits use filter

            Filter is a type of implicit split which splits the input according to the condition provided in the filter condition.

Example : 

Input 1
=======
1 2 3 4
5 6 7 8
0 46 47 48


Example1:

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
X = SPLIT A IF a >0 , B IF a>40;
DUMP X;

Example 2:

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
X = FILTER A BY a > 0;
Y = FILTER A BY a > 40;
DUMP X;
DUMP Y;

DESCRIPTION :
In above example using filter instead of split improves the performance of the pig.
This reduces processing a multiple times in same statement and also helps in intermediate
Storage of output which can be used for other processing.

2. Store Intermediate Results:
It is good to store intermediate results so that further processing’s can be done on top of that instead of processing the initial input again which is time consuming.

Example:

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
X = FILTER A BY a > 0;
L = GROUP X BY a;
Y = FOREACH L GENERATE SUM(b);
Z = FOREACH L GENERATE DIFFERENCE(b);
D= STORE Y IN ‘/home/hadoop/’;

In above example instead of reprocessing a to generate sum and difference of field it is better to store the result of L and do further processing on it. Which saves reprocessing of a again for same condition.

3. Remove Unnecessary dumps :
             DUMP is a interactive statement using which will disable multiquery processing and consumes more time than STORE statement.


Example:

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
X = FILTER A BY a > 0;
DUMP X;
L = GROUP X BY a;
Y = FOREACH L GENERATE SUM(b);
Z = FOREACH L GENERATE DIFFERENCE(b);
D= STORE Y IN ‘/home/hadoop/’;

Description :
        In above example aliases A and X will be processed which is dumped (which is treated as one job) and then A,X,L,Y,Z and D is performed so the same script is executed as two separate jobs because of dump statement. So remove all the pig scripts that are used for debugging.

4. Avoid using User Defined Functions:
           Unless it is necessary avoid UDF’s as it needs extra processing  other than running the pig scripts Which consumes time and CPU.Pig provides many built in functionalities which can be used in place of UDF’s
Some built in functions like SUM, COUNT,DISTINCT,FLATTEN,GROUPBY,FILTER can be used instead of elaborating everything in a UDF which is again wastage of time and memory.

5. Other ways to improve pig performance

a. Proper Usage of Operators:
 A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
 B= FILTER A BY a by NOT (NOT(a>5) OR a>10);
Can be simplified using a AND operator which saves extra burden of doing two negations and one or to obtain the result.

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
 B= FILTER A BY a a>5 AND a<10;

 b. Avoid unnecessary calculations

 A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
 B= FILTER A BY a > 1+2;
   Here instead of doing 1+2 operation for each value of a it can be simplified with pre computed value of 3.

B= FILTER A BY a > 3;

c. Avoid repetition of conditions in various forms

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
 B= FILTER A BY a > 0 and a > 3;

   Here if a is greater than 3 then obviously it should be greater than 0 hence this check can be avoided and simplified to

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
 B= FILTER A BY  a > 3;

4. Elimination of the condition which always results in result TRUE like

  B = FILTER A BY  1==1;

6. Minimize number of opertions performed

Example :

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
B = GROUP A BY a;
C = FOREACH B GENERATE a+b AS d,a-c AS e;
D = FOREACH C GENERATE d-c;
E = STORE D IN ‘/home/hadoop/output/’

Here above operation can be simplified to

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
B = GROUP A BY a;
C = FOREACH B GENERATE a+b-c AS d,a-c AS e;
E = STORE C IN ‘/home/hadoop/output/’

Since calculating a+b and then storing the result in a alias and subtracting c from it is same as doing a-b-c which saves time and memory.

8. Limit the number of output lines
          Store or dump only the number of lines you are interested by setting the limit.
         
A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
B = FILTER A > 50;
C = LIMIT B 10;

Here instead of listing all the values of B , to view just 10 values limit it with the count.

PUSHUPFilters:
Instead of processing the entire data filter only the input that is needed and do processing on top of it .
Example :
A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);
B = FILTER A > 0;
C = FOREACH B GENERATE SUM(b);

In above example instead of computing sum for all the values of field b just filter the values that are greater than zero and perform the sum which prevents processing of  unwanted data.

9. Do Explicit Casting:
           Instead of allowing the pig to do the casting perform the casting explicitly when the data type is known which saves time.

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int);

As mentioned above load the data with explicit casting instead of
A = load '/home/hadoop/input/' using PigStorage('\t') as (a,b,c,d);

10  Filter only necessary fields while processing  :

      A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int,e:int);
      B = FILTER A BY a >0;
      C = FILTER B BY b > 50;
      B = FOR EACH GENERATE COUNT(c);
      D = FOR EACH B GENERATE COUNT(d);
      E = UNION B,D;

Here in above example field D is used nowhere in the processing so it can be removed after loading the data to alias A which will save time and memory and improve performance.

Above script can be modified as

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int,c:int,d:int,e:int);
A1 = FOREACH A generate a,b,c,d;
B = FILTER A1 BY a >0;
C = FILTER A1 BY b > 50;
      B = FOR EACH GENERATE COUNT(c);
      D = FOR EACH B GENERATE COUNT(d);
      E = UNION B,D;

11. UDF Considerations :

         UDF functions are used to add customized functionalities to pig script.UDF’s if not coded in proper way can degrade the performance of scripts.Following are the things that should be added while creating a UDF.

A.    Place the UDF classes in proper package
B.    Register the jars before using calling udf’s
C.    Don’t register unwanted jar in the pig script.
D.   Call the functions with proper package and class names.
12. Drop Null Values before calculations :
           Remove values that are having only values as null has null values no significance in join,sum,difference and other  algebric functions they are usually ignored.

For example pig can be written as
A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int);
B = FILTER A BY a is not null;
C = FILTER A BY b is not null;
D = JOIN B by a,C by b;

13. Use the PARLLEL clause
          PARLLEL clause in a job lets us to set number of reducer jobs that can run in parllel. Default the PARLLEL value is set to 1.If this PARLLEL class is not set by default we will get only one reduce job which will affect the performance of the pig script.
Example you can set PARLLEL class in pig script as

SET DEFAULT_PARLLEL 10;
This will set number of reducer than can run parallel as 10.

14. Load only the needed input
          In a file if you want to do processing only in first 50 rows then just load only 50 rows instead of loading the entire file.
For Example
A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int);
B = limit A 100;

15. DISTINCT VS GROUP BY
         If  you want to find distinct values of a field then there are two approaches to obtain it and they are GROUP BY and DISTINCT. It is always better to use DISTINCT instead of GROUP BY to boost the performance of pig script.

Example
A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int);
B = FOREACH A GENERATE a;
C = GROUP B BY a;
Can be improved by changing it as

Example
A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int);
B = FOREACH A GENERATE a;
C = distict b;

15. Filter Early an dofter
         If possible filter the values before doing a group by.Since combiner improves the performance of a script which can be be used only without operators between group by and for each.

Example:
A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int);
B = FOREACH A GENERATE a;
C = GROUP B BY a;
D = FOREACH A GENERATE sum(b);
E = FILTER D BY b>0;

Can be optimized as

A = load '/home/hadoop/input/' using PigStorage('\t') as (a:int,b:int);
B = FOREACH A GENERATE a;
C = GROUP B BY a;
C1 = FILTER A BY b >0;
D = FOREACH A GENERATE sum(b);

16. Keep huge data last
While doing a join or cogroup with input of multiple size always keep the large data at the end
Doing so will improve the pig performance and will make it more efficient.


Saturday, March 17, 2012

Hadoop NameNode Failure

Scenario 1:

The HDFS fsimage and editlog is written into multiple places including a NFS mount.

A) NameNode Daemon Crash :
Solution:
Just restart the Namenode process

B) Host is Down where the Name Node is running.

Solution:

1. Start the namenode in a different host with a empty dfs.name.dir
2. Point the dfs.name.dir to the NFS mount where we have copy of the meta data.
OR
3. Use --importCheckpoint option while starting namenode after pointing fs.checkpoint.dir to checkpoint directory from Secondary NameNode
4. Change the fs.default.name to the backup host name URI and restart the cluster with all the slave IP's in slaves file.

Note - We may miss the edit that might have happened after the last checkpoint.



Scenario 2:

The HDFS fsimage is written into a single directory.

A ) NameNode Daemon Crash:
Solution : Unknown

B ) Host is down where the Name Node is running.

Solution:


1. Create a blank directory pointing to dfs.name.dir to directory in (1)
2. Start the Namenode with -importCheckpoint after pointing fs.checkpoint.dir to checkpoint directory from Secondary NameNode
3. Change the fs.default.name to the backup host name URI and restart the cluster with all the slave IP's in slaves file.

This way we would miss again the files edited after last checkpoint.

Change the fs.default.name to the backup host name URI

If we can change the IP of the new name node to the old name node IP.

It would be easier to fix.

HA Namenode

More info on this can be found here: https://ccp.cloudera.com/display/CDHDOC/CDH3+Deployment+on+a+Cluster

Sunday, August 28, 2011

Copy To Hadoop In Scale - Part 2

I was very much in unrest to copy huge number of files in to HDFS for a long time.
Frantcially "Googled", and asked questions on diffrent forum's. Some questions was foolish. How do I know that ?
Forum folks answered or sometimes even they did not. But I tried as I am really inspired by the quote
"STAY HUNGRY STAY FOOLISH". Finally found some way we can use the hadoop data node monsters to help me copy the files.
Hadoop is like a monster CPU - monster Hard Drive. Then Found DISTCP - a unique tool which is very useful for hadoop developers.
And came

https://github.com/Jagaran/HadoopCopier

This tool uses Cloudera Hadoop CDH3 as in libraries.

There are many interesting findings in "My Lab" to follow on in the endeavor of being "HUNGRY and FOOLISH"
Sometimes you have people,utilities and things near to you who can do the job for you and you search distant places.
In my native West Bengal, India there is a very famous quote "Rural sage doesn't get the credit".
I found utilites in my laptop present for last 6 months to do my job which I searched across the web.
For you it can be different - think once - Are you missing something ??

Stay tuned

JD

Thursday, August 25, 2011

Interesting !!!!!!

A two tier architecture is where you would have a some web pages or simple application running at the client side and at the back end a server with functionality to answer the queries of the client.A three tier application you have a client application, a middle-ware and database or EIS system running at back end.The middle ware would have a web-server and a application server.Now the interesting part is that i have found that human body is a typical example of the above said architecture.I have seen people who hav good appearance- the client side And a brain some kind of server that send queries to all the 5 senses or in this case 5 different client programs running at the front end.Again some people hav additional processes running in brain to validate the business logic--- in this case how to help himself at any cost.Brain is a excellent database with the concept of garbage collection with in database as it deletes less referenced data after a period of time.Truly amazing that what we are trying today or may invent tomorrow is already present within our body.

Monday, August 22, 2011

Map Reduce Job to Calculate Total Unique Lines in Your Dataset

Now the Pandora's box is open, I woud unleash the power of this powerful MR framework by giving the example of another very useful requirement. "Give me the total unique lines in the dataset ?". Jumbo kicks in as we fire the following command and rolls down your data nodes to give you the ans:

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class CalculateDistinct {
public static class Map extends MapReduceBase implements Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text("");
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
word.set(value.toString());
output.collect(word,one);
}
}
public static class Combine extends MapReduceBase implements Reducer{

public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
Text text = new Text("UniqueUsers");
while (values.hasNext()) {
sum += 1;
values.next();
}

output.collect(text, new IntWritable(sum));
}

}
public static class Reduce extends MapReduceBase implements Reducer {
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += 1;
values.next();
}

output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(CalculateDistinct.class);
conf.set("fs.default.name","hdfs://localhost:8020/home/hadoop/");
conf.setJobName("Calculate Distinct");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Combine.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("file:///Users/jagarandas/Work-Assignment/Analytics/analytics-poc/sample-data/"));
FileOutputFormat.setOutputPath(conf, new Path("/home/hadoop/sample-data2/"));
JobClient.runJob(conf);
}
}

Map Reduce Job to Calculate Total Lines in Your Dataset

"Get the total lines in our logs " - A common requirement that boils down from an Application Manager to an poor Hadoop Engineer. "What is the ETA ? " - Next question in the firing line. Common belief now is "Hadoop is the Gin's Magic Lamp", just stand infront and say "I need this......" and it would say "Aka...Here is your total line count". I would rather say hadoop is like "Pandora's Box", you open it and discover many things. So let me come to the point. I have the solution for the poor hadoop guy as below:

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class LineCount {
public static class Map extends MapReduceBase implements Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text("AKA Total Lines For You....");
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
output.collect(word, one);
}
}
public static class Reduce extends MapReduceBase implements Reducer {
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(LineCount.class);
conf.set("fs.default.name","hdfs://localhost:8020/home/hadoop/");
conf.setJobName("LineCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("file:///Users/jagarandas/Work-Assignment/Analytics/analytics-poc/sample-data/"));
FileOutputFormat.setOutputPath(conf, new Path("/home/hadoop/sample-data1/"));
JobClient.runJob(conf);
}
}