Use a lookup HashMap in hive script with UDF

I was using custom jar for my mapreduce job in the past few years, and because it’s pure java programming, I have a lot of flexibility. But writing java results in a lot of code to maintain, and most of the mapreduce jobs are just joining with a little spice in it, so moving to Hive may be a better path.

Problem

The mapreduce job I face here is to left outer join two different datasets using the same keys, because it’s a outer join, there will be null values, and for these null values, I want to lookup the default values to assign from a map.

For example, I have two datasets:
dataset 1: KEY_ID CITY SIZE_TYPE
dataset 2: KEY_ID POPULATION

And I want to have a left outer join on KEY_ID, for those CITYs cannot find a POPULATION, I will lookup it’s SIZE_TYPE in a map and find the default values.

Of course I can treat the default value map as a third dataset in the join, the reason I don’t want to do it is because dataset 1&2 are very huge(suppose millions of rows), and the map is very small(just a few hundred rows), so join these 3 datasets will impact the performance.

In Custom jar, I can use Distributed Cache to built this map. When I add the map file as a distributed cache file, it will be distributed to every node, and each mapper or reducer will be able to read it. So for each joined pair, I can simply use the SIZE_TYPE as the key to find its corresponding value.

In Hive, it’s harder because we are writing SQL-like script, the functionality is very limited, and we cannot easily use HashMap like we use it in Java.

Luckily, Hive supports UDF (User-Defined-Function) and ADD FILE, and we can use them to do the same thing.

ADD FILE filename in Hive equals add a distributed Cache file in java, this file will be distributed to every node.

Writing UDF is writing java code, which can be used in Hive script. Here, we will read the file in our UDF and build a HashMap, the HashMap will only be built once.

Solution

Suppose we have two datasets and one map file:

City Dataset

1	  New York	  MEDIUM
2	  Beijing  	LARGE
3	  Chicago	  LARGE
4	  Tempe  	SMALL
5	  LONDON	  LARGE

Population Dataset

1	  8000000
2	  20693000
3	  2700000

Default Population map file

LARGE	  1000000
MEDIUM	  500000
SMALL	  100000

Hive Script

In the hive script, I created 3 tables, 2 tables for the input datasets, 1 for the output.

As you can see in the code, I use ADD FILE /home/lichu/Downloads/defaultPopulation; to add the defaultPopulation file to the distributed cache, and I use

ADD JAR /home/lichu/UDF.jar;
CREATE TEMPORARY FUNCTION getDefaultPopulation AS 'test.GetDefaultPopulation';

to add the UDF jar file(will show how I write this function in the next section), and create a temporary function in the package. Then I used the ‘getDefaultPopulation’ function in the select block.

-- author: Chun
-- email : me@lichun.cc

SET city_input = '/home/lichu/Downloads/city';
SET population_input = '/home/lichu/Downloads/population';
SET city_population_output = '/home/lichu/Downloads/city_population_output';

ADD FILE /home/lichu/Downloads/defaultPopulation;
ADD JAR /home/lichu/UDF.jar;
CREATE TEMPORARY FUNCTION getDefaultPopulation AS 'test.GetDefaultPopulation';

DROP TABLE IF EXISTS city;
CREATE EXTERNAL TABLE city (
     KEY_ID                 INT,
     NAME           		STRING,
     SIZE_TYPE		        STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
STORED AS TEXTFILE LOCATION ${hiveconf:city_input}; 

DROP TABLE IF EXISTS population;
CREATE EXTERNAL TABLE population (
     KEY_ID                 INT,
     POPULATION        		INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
STORED AS TEXTFILE LOCATION ${hiveconf:population_input}; 

DROP TABLE IF EXISTS city_population;
CREATE EXTERNAL TABLE city_population (
     KEY_ID                 INT,
     NAME           		STRING,
     SIZE_TYPE		        STRING,
     POPULATION        		INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
STORED AS TEXTFILE LOCATION ${hiveconf:city_population_output};

FROM
  (SELECT 
     c.KEY_ID,
     c.NAME,
     c.SIZE_TYPE,
     CASE
         WHEN p.POPULATION IS NULL 
             THEN getDefaultPopulation(c.SIZE_TYPE, './defaultPopulation')
             ELSE p.POPULATION
     END as POPULATION
   FROM 
        city c
   LEFT OUTER JOIN
        population p
   ON
       (c.KEY_ID = p.KEY_ID)
  ) joined
INSERT OVERWRITE TABLE city_population
SELECT *
;

getDefaultPopulation UDF

Here is the java code I used for the getDefaultPopulation UDF, it’s very straight forward, notice that I check if the map is null, if it is null, initialize it with the map file, otherwise, just reuse it.

package test;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.metadata.HiveException;

public class GetDefaultPopulation extends UDF {
    private Map<String, Integer> populationMap;

    public Integer evaluate(String sizeType, String mapFile) throws HiveException {
        if (populationMap == null) {
            populationMap = new HashMap<String, Integer>();
            try {
                BufferedReader lineReader = new BufferedReader(new FileReader(mapFile));

                String line = null;
                while ((line = lineReader.readLine()) != null) {
                    String[] pair = line.split("\t");
                    String type = pair[0];
                    int population = Integer.parseInt(pair[1]);
                    populationMap.put(type, population);
                }
            } catch (FileNotFoundException e) {
                throw new HiveException(mapFile + " doesn't exist");
            } catch (IOException e) {
                throw new HiveException("process file " + mapFile + " failed, please check format");
            }
        }

        if (populationMap.containsKey(sizeType)) {
            return populationMap.get(sizeType);
        }

        return null;
    }
}

Note here I extends UDF class, and in your class you must write at least one evaluate function, Hive will look for this function when you call it in the script. After you finish the code, just export it to a JAR file in eclipse or build the jar file using command line.

19 thoughts on “Use a lookup HashMap in hive script with UDF

  1. Sanjeev

    Thanks for sharing the code. Is there any way we can use a global hash-map and keep on updating it at run-time. I have to develop a UDF and have to lookup values in a hash-map and check if it exists, if not then I’ll populate it but I’m not sure if it’ll be visible in other nodes a. Is it possible to do this in hive ?

    Reply
    1. purplechun Post author

      Hi Sanjeev, I didn’t meet this use case before so I don’t have the answer for now. But I doubt it, suppose multiple nodes are running at the same time and some nodes may have this situation at the same time, the updated hashmap you populated on one node may not have enough time to be populated to another node.

      Also, I believe the map file is added using DistributedCache, based on the doc, http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html, you cannot modify it during execution.

      Reply
  2. Dhruv Kapur

    Hey thanks for the post, I have a question… you mention:

    “Writing UDF is writing java code, which can be used in Hive script. Here, we will read the file in our UDF and build a HashMap, the HashMap will only be built once.

    Are you sure about the hashmap only building it once per node? Is there some documentation that you could point me to. I am concerned that in case this happens for every row then it will be pretty slow operation…

    Thanks.. 🙂

    Reply
      1. Abhishek

        Every time we call the UDF from hive, evaluate method gets called. And because the file read operations and hash map building is done inside evaluate method I didn’t understand how the hash map will only be built once ?

        Reply
        1. Abhishek

          ohh got it..
          Second time when the evaluate method gets called if (populationMap == null) will be false, hence no file read or hash map building will take place. Am I correct ?

          Reply
  3. chinmay

    Thank you for this wonderful post. I had a similar requirement and was able to implement it very quickly because of your post.

    Reply
      1. kang.kai

        I use your method follow the step:
        1. add file /tmp/id_test.txt;
        2. add jar hdfs://nameservice1/user/hive/warehouse/bin/udfs/test.jar;
        3. create temporary function testFile as ‘com.imo.hive.common.udf.TestFile’;
        select testFile(‘123’, ‘./id_test.txt’) from test_udfs;
        But i got the Error :
        Error: Error while compiling statement: FAILED: SemanticException [Error 10014]: Line 1:7 Wrong arguments ”./id_test.txt”: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String com.immomo.hive.common.udf.UDFTestFile.evaluate(java.lang.String,java.lang.String) throws org.apache.hadoop.hive.ql.metadata.HiveException on object com.immomo.hive.common.udf.UDFTestFile@638912c8 of class com.immomo.hive.common.udf.UDFTestFile with arguments {123:java.lang.String, ./id_test.txt:java.lang.String} of size 2 (state=42000,code=10014)

        Reply
        1. purplechun Post author

          seems like you passed in the wrong type of arguments, it will be helpful if you can paste how you declared the testFile method.

          Reply
        2. Rajeev Atmakuri

          I am also getting the same error when ever all the passed arguments are constants.
          Is there any way to resolve this?

          Reply
          1. yak

            It seems that if the query run mapreduce job, it will see the distributed cache, but if the query run without mapreduce job (ex, select * from tbl limit 10), it will see the local file system.

  4. Rohit Agrawal

    Really helpful post, I had the similar requirement, tried several other ways, but finally due to your post I achieved the desired result.

    Thanks a lot!!!

    Reply
  5. Shashikant

    Above udf works well when we have both jar and file on edge node. But when I move it to HDFS for oozie workflow, it errors out with below error. Please advice how the existing code can be modified to read files from hdfs location inside udf.

    Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public boolean udf.cmb.UDF_reference.evaluate(java.lang.String,java.lang.String) throws org.apache.hadoop.hive.ql.metadata.HiveException on object udf.cmb.UDF_reference@7e7743ec of class udf.cmb.UDF_reference with arguments {CRRUnallocat:java.lang.String, hdfs:///CDQT/Conf/crrcode.list:java.lang.String} of size 2

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *