I was using custom jar for my mapreduce job in the past few years, and because it’s pure java programming, I have a lot of flexibility. But writing java results in a lot of code to maintain, and most of the mapreduce jobs are just joining with a little spice in it, so moving to Hive may be a better path.
Problem
The mapreduce job I face here is to left outer join two different datasets using the same keys, because it’s a outer join, there will be null values, and for these null values, I want to lookup the default values to assign from a map.
For example, I have two datasets:
dataset 1: KEY_ID CITY SIZE_TYPE
dataset 2: KEY_ID POPULATION
And I want to have a left outer join on KEY_ID, for those CITYs cannot find a POPULATION, I will lookup it’s SIZE_TYPE in a map and find the default values.
Of course I can treat the default value map as a third dataset in the join, the reason I don’t want to do it is because dataset 1&2 are very huge(suppose millions of rows), and the map is very small(just a few hundred rows), so join these 3 datasets will impact the performance.
In Custom jar, I can use Distributed Cache to built this map. When I add the map file as a distributed cache file, it will be distributed to every node, and each mapper or reducer will be able to read it. So for each joined pair, I can simply use the SIZE_TYPE as the key to find its corresponding value.
In Hive, it’s harder because we are writing SQL-like script, the functionality is very limited, and we cannot easily use HashMap like we use it in Java.
Luckily, Hive supports UDF (User-Defined-Function) and ADD FILE, and we can use them to do the same thing.
ADD FILE filename
in Hive equals add a distributed Cache file in java, this file will be distributed to every node.
Writing UDF
is writing java code, which can be used in Hive script. Here, we will read the file in our UDF and build a HashMap, the HashMap will only be built once.
Solution
Suppose we have two datasets and one map file:
City Dataset
1 New York MEDIUM 2 Beijing LARGE 3 Chicago LARGE 4 Tempe SMALL 5 LONDON LARGE
Population Dataset
1 8000000 2 20693000 3 2700000
Default Population map file
LARGE 1000000 MEDIUM 500000 SMALL 100000
Hive Script
In the hive script, I created 3 tables, 2 tables for the input datasets, 1 for the output.
As you can see in the code, I use ADD FILE /home/lichu/Downloads/defaultPopulation;
to add the defaultPopulation file to the distributed cache, and I use
ADD JAR /home/lichu/UDF.jar; CREATE TEMPORARY FUNCTION getDefaultPopulation AS 'test.GetDefaultPopulation';
to add the UDF jar file(will show how I write this function in the next section), and create a temporary function in the package. Then I used the ‘getDefaultPopulation’ function in the select block.
-- author: Chun -- email : me@lichun.cc SET city_input = '/home/lichu/Downloads/city'; SET population_input = '/home/lichu/Downloads/population'; SET city_population_output = '/home/lichu/Downloads/city_population_output'; ADD FILE /home/lichu/Downloads/defaultPopulation; ADD JAR /home/lichu/UDF.jar; CREATE TEMPORARY FUNCTION getDefaultPopulation AS 'test.GetDefaultPopulation'; DROP TABLE IF EXISTS city; CREATE EXTERNAL TABLE city ( KEY_ID INT, NAME STRING, SIZE_TYPE STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION ${hiveconf:city_input}; DROP TABLE IF EXISTS population; CREATE EXTERNAL TABLE population ( KEY_ID INT, POPULATION INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION ${hiveconf:population_input}; DROP TABLE IF EXISTS city_population; CREATE EXTERNAL TABLE city_population ( KEY_ID INT, NAME STRING, SIZE_TYPE STRING, POPULATION INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION ${hiveconf:city_population_output}; FROM (SELECT c.KEY_ID, c.NAME, c.SIZE_TYPE, CASE WHEN p.POPULATION IS NULL THEN getDefaultPopulation(c.SIZE_TYPE, './defaultPopulation') ELSE p.POPULATION END as POPULATION FROM city c LEFT OUTER JOIN population p ON (c.KEY_ID = p.KEY_ID) ) joined INSERT OVERWRITE TABLE city_population SELECT * ;
getDefaultPopulation UDF
Here is the java code I used for the getDefaultPopulation UDF, it’s very straight forward, notice that I check if the map is null, if it is null, initialize it with the map file, otherwise, just reuse it.
package test; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.metadata.HiveException; public class GetDefaultPopulation extends UDF { private Map<String, Integer> populationMap; public Integer evaluate(String sizeType, String mapFile) throws HiveException { if (populationMap == null) { populationMap = new HashMap<String, Integer>(); try { BufferedReader lineReader = new BufferedReader(new FileReader(mapFile)); String line = null; while ((line = lineReader.readLine()) != null) { String[] pair = line.split("\t"); String type = pair[0]; int population = Integer.parseInt(pair[1]); populationMap.put(type, population); } } catch (FileNotFoundException e) { throw new HiveException(mapFile + " doesn't exist"); } catch (IOException e) { throw new HiveException("process file " + mapFile + " failed, please check format"); } } if (populationMap.containsKey(sizeType)) { return populationMap.get(sizeType); } return null; } }
Note here I extends UDF class, and in your class you must write at least one evaluate function, Hive will look for this function when you call it in the script. After you finish the code, just export it to a JAR file in eclipse or build the jar file using command line.
Thanks for sharing the code. Is there any way we can use a global hash-map and keep on updating it at run-time. I have to develop a UDF and have to lookup values in a hash-map and check if it exists, if not then I’ll populate it but I’m not sure if it’ll be visible in other nodes a. Is it possible to do this in hive ?
Hi Sanjeev, I didn’t meet this use case before so I don’t have the answer for now. But I doubt it, suppose multiple nodes are running at the same time and some nodes may have this situation at the same time, the updated hashmap you populated on one node may not have enough time to be populated to another node.
Also, I believe the map file is added using DistributedCache, based on the doc, http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html, you cannot modify it during execution.
Hey thanks for the post, I have a question… you mention:
“Writing UDF is writing java code, which can be used in Hive script. Here, we will read the file in our UDF and build a HashMap, the HashMap will only be built once.”
Are you sure about the hashmap only building it once per node? Is there some documentation that you could point me to. I am concerned that in case this happens for every row then it will be pretty slow operation…
Thanks.. 🙂
It won’t, I did a test before 🙂
Okay thanks!
How did you test though? I’m interested in looking at it.
Every time we call the UDF from hive, evaluate method gets called. And because the file read operations and hash map building is done inside evaluate method I didn’t understand how the hash map will only be built once ?
ohh got it..
Second time when the evaluate method gets called if (populationMap == null) will be false, hence no file read or hash map building will take place. Am I correct ?
Thank you for this wonderful post. I had a similar requirement and was able to implement it very quickly because of your post.
glad to know this post helps 🙂
I use your method follow the step:
1. add file /tmp/id_test.txt;
2. add jar hdfs://nameservice1/user/hive/warehouse/bin/udfs/test.jar;
3. create temporary function testFile as ‘com.imo.hive.common.udf.TestFile’;
select testFile(‘123’, ‘./id_test.txt’) from test_udfs;
But i got the Error :
Error: Error while compiling statement: FAILED: SemanticException [Error 10014]: Line 1:7 Wrong arguments ”./id_test.txt”: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String com.immomo.hive.common.udf.UDFTestFile.evaluate(java.lang.String,java.lang.String) throws org.apache.hadoop.hive.ql.metadata.HiveException on object com.immomo.hive.common.udf.UDFTestFile@638912c8 of class com.immomo.hive.common.udf.UDFTestFile with arguments {123:java.lang.String, ./id_test.txt:java.lang.String} of size 2 (state=42000,code=10014)
seems like you passed in the wrong type of arguments, it will be helpful if you can paste how you declared the testFile method.
I’m getting this too, albeit in a different UDF. Please reply with the solution you found.
I am also getting the same error when ever all the passed arguments are constants.
Is there any way to resolve this?
Same error. Was this code ever worked for anyone?
It seems that if the query run mapreduce job, it will see the distributed cache, but if the query run without mapreduce job (ex, select * from tbl limit 10), it will see the local file system.
Really helpful post, I had the similar requirement, tried several other ways, but finally due to your post I achieved the desired result.
Thanks a lot!!!
Glad to be helpful 🙂
Is there a way we can read the file from hdfs inside a udf ?,instead of local fs .
Above udf works well when we have both jar and file on edge node. But when I move it to HDFS for oozie workflow, it errors out with below error. Please advice how the existing code can be modified to read files from hdfs location inside udf.
Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public boolean udf.cmb.UDF_reference.evaluate(java.lang.String,java.lang.String) throws org.apache.hadoop.hive.ql.metadata.HiveException on object udf.cmb.UDF_reference@7e7743ec of class udf.cmb.UDF_reference with arguments {CRRUnallocat:java.lang.String, hdfs:///CDQT/Conf/crrcode.list:java.lang.String} of size 2