Skip to content

Develop Map/Reduce with reduced assumptions

It all started with this odd bug…

One of our teams is writing a service, that among other things, runs map/reduce jobs built as pig scripts with Java UDFs. The scripts accepts CSV files which have a text header followed by lines of data. It performs some grouping and then calls a UDF which essentially filter, enrich and transforms the data outputting another CSV with a new header followed by data – something like the following:

  1. input = load '/data/INPUT2.dat' using PigStorage(',')...
  2. grouped = GROUP input BY key;
  3. results = FOREACH grouped GENERATE  evaluateUDF(input) as output;
  4. STORE output...

and all was well. The job spawns a few map tasks partitions, groups the data and runs a single reduce where the actual evaluation happens.

Then someone wanted it to run faster – We can do that by adding reducers we can do that by adding PARALLEL X to the group statement where X is the number of reducers we want. And then the trouble began. Everything worked well for up to 5 reducers – go any higher and few results were lost. The script is pretty basic (the actual script looks a little different but that’s basically it) the UDF is not earth shattering yet still sometimes results are lost.

Even though I am very busy (well, not really, but you know, some members of my team might be reading this…) this was so annoying I just had to understand what happen so I took it upon myself to debug this. The main problem was I couldn’t write a unit test to duplicate the behavior  – it only occurred when running on the large file…

First thing I noticed is that we had duplicate results in the output, I removed those and saw that actually we’re only losing one result (again, only with 5 or more reducers). I diffed the  correct output with faulted one and I even had the problematic key.

I told myself something must be wrong with the partitioner  – The default partitioner that pig (and hadoop in general) uses is fairly simple it takes the hash code of the key, ands it with max int (so we get a positive number) and divides that by the number of partitions:

  1.  return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;

I couldn’t really see what might be wring with this but still I wrote my own instead:

  2. public class Partition extends Partitioner {
  3.     //@Override
  4.     private final static Logger logger = LoggerFactory.getLogger(Partition.class);
  5.     public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
  6.         try {
  7.             if(key.getValueAsPigType() instanceof String) {
  8.                 String skey=((String)key.getValueAsPigType()).replaceAll("\"","");
  9.                 Long ikey= Long.getLong(skey);
  10.                 return (int)((ikey& Long.MAX_VALUE) % numPartitions);
  11.             }
  12.             else {
  13.                 return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
  14.             }
  15.         }
  16.         catch (Exception e ){
  17.             return 0;
  18.         }
  19.     }
  20. }

Problem solved – everything works.  I guess that damn Hadoop partitioner is all flawed – not! – So now I am even more annoyed because I really don’t understand what’s going on here. but at least now I control the partitioner so I added some logging. I saw that the problematic key goes to partition 0 – so I made everything go to partition 0 and everything worked. then it finally stuck me. I logged all the keys that go into partition 0 and saw that the problematic key is the first .   Duh!  I should have noticed that earlier. Here’s what the UDF code essentially looks like:

  1. 	public DataBag exec(Tuple input) throws IOException
  2. 	{
  3.             if (session == null){
  4.                  session = ....
  5.                  return addHeadersNames();
  6.             }
  8.             return  eval(input);
  10. 	}

The Tuple is the input we get from the pig script into our UDF, what happens here is that we throw out the first input we get and just output the headers for our result file. As it happens when we have enough partitions a significant input (the missing result) is the first we get in the partition

The developer who wrote this assumed that the first input line the UDF would get would be the input header file. It is alright to discard it and output the new header file instead. However Hadoop doesn’t work like that when we have code that runs in a map/reduce it might be called in different circumstances that we originally assumed and we have to pay attention to that. For instance, another problem with the code above is that when run with multiple reducers you get multiple copies of the output header…

So there you have, the title says it all : develop map/reduce code with reduced assumptions



Published inBlog