Hive – Selecting columns with regular expression

In Hive there is rather an unique feature that allows to select columns by
regular expression instead of using column by names.

It’s very useful when we need to select all columns except one. In most of the SQL databases we would have to specify all columns, but in Hive there is this feature that can save us typing.

Let’s say there is a people table with column name, age, city, country and created_at. To select all columns except created_at we can write:

set hive.support.quoted.identifiers=none;
 
select 
    `(created_at)?+.+`
from people
limit 10;

This is equivalent to:

select
    name, age, city, county
from people
limit 10;

Please note that in Hive 0.13 or later you have to set hive.support.quoted.identifier to none.
I have never seen such functionality in others SQL databases.

References

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

Spark SQL

This is one of the Hive-specific features that are not available in Spark SQL.

Hive gotchas – Order By

There is this one feature in Hive that I really hate: ORDER BY col_index.

Historically order by clause accepted only column aliases, like in the simple example below:

select id, name from people order by name;

+------------+--------------+--+
| people.id  | people.name  |
+------------+--------------+--+
| 5          | Jimmy        |
| 2          | John         |
| 1          | Kate         |
| 4          | Mike         |
| 3          | Sam          |
+------------+--------------+--+

In other relational databases it is possible to give not only column alias but also column index, It much simpler to say “column 3” rather than typing whole name or alias. This option was not supported in Hive at the beginning, but community noticed that and a ticket was created.

Since Hive 0.11.0 it is possible to order the result by column index as well, however there is a gotcha here. There is a property that enables this new option: hive.groupby.orderby.position.alias must be set to ‘true’. The problem is that by default it is set to ‘false’ and in that case you can still use numbers in order by clause, but they are interpreted literally (as numbers) not as column index, which is rather strange.

So for example in the any modern Hive version where you do something like that:

select id, name from people order by 2;

+------------+--------------+--+
| people.id  | people.name  |
+------------+--------------+--+
| 1          | Kate         |
| 2          | John         |
| 3          | Sam          |
| 4          | Mike         |
| 5          | Jimmy        |
+------------+--------------+--+

As you can see by default it was interpreted as “value 2”, not the “column number 2”. After enabling the option you can change how the order by works:

set hive.groupby.orderby.position.alias=true;
select id, name from people order by 2;

+-----+--------+--+
| id  |  name  |
+-----+--------+--+
| 5   | Jimmy  |
| 2   | John   |
| 1   | Kate   |
| 4   | Mike   |
| 3   | Sam    |
+-----+--------+--+

So this time after enabling option we can use column number to sort by name. The problem is that whenever you work in Hive you have to think if the hive.groupby.orderby.position.alias was enabled in current session or not. This makes rather impractical and limits the usage of this syntactical sugar. Moreover I cannot really see any use case for using order by <value>

References

Hive Order By – https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Hive table properties

It was a surprise for me to find those two properties – very useful when dealing with large amount of files generated in other systems. I couldn’t find it on Hive docs, but you can come across these settings on forums.

skip.header.line.count tells how many lines from the file should be skipped. Useful when you read CSV files and first file contains header with column names. It works with text file (ROW FORMAT DELIMITED FIELDS TERMINATED BY…) and with CSV Serde. There is also complementary settings that allows to skip footer lines: skip.footer.line.count. The problem is however that Spark does’t recognize those properties so be careful when you plan to read the table later via Spark HiveContex.

Speaking about Hive table properties the following setting may also be very useful.

serialization.null.format is another table property which define how NULL values are encoded in the text file. Here is an example of using “null” string as NULL marker:

CREATE TABLE table_null(
     s1 STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES('serialization.null.format'='null');

So whenever field will contain string “null” it will be interpreted as NULL value.

Field separator escape

One more useful property that can be used when dealing with text files is delim.escape. This property allows setting custom character that will be used to escpe separator in column values:

CREATE TABLE table_escape(
    s1 STRING,
    s2 STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES('escape.delim'='\\');

(We use backslash for escape delimiter, but it has to be escaped as in Java)

In such case the following data file:

aaaa,bbbb
aaa\,bbb,cc

will be interpreated as:

0: jdbc:hive2://localhost:10000> select * from table_escape
0: jdbc:hive2://localhost:10000> ;
+------------------+------------------+--+
| table_escape.s1  | table_escape.s2  |
+------------------+------------------+--+
| aaaa             | bbbb             |
| aaa,bbb          | cc               |
+------------------+------------------+--+

Custom line breaks

There is also syntax that allows to split records with some character other than new line:

CREATE TABLE table_lines(
    s1 STRING,
    s2 STRING
) ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
LINES TERMINATED BY '|';

However it is currently not supported (I tried on Hive 1.2.1):

Error: Error while compiling statement: FAILED: SemanticException 5:20 LINES TERMINATED BY only supports newline '\n' right now. Error encountered near token ''|'' (state=42000,code=40000)

Binary formats

Generally speaking, text files should be rather avoided on Hadoop, because binary columnar format usually give better performance. Nevertheless CSV or other plain text files can be found quite often as an input from external systems. In such cases it good to have different formatting options and easily start using Hive in existing ecosystem without too much hassle.

 

 

Hive partition filtering

Some time ago I found one pitfall related to partition filtering and user defined functions.

Let’s assume we have partitioned table that looks like this:

CREATE TABLE pageviews(
... 
) PARTITIONED BY (v_date STRING)
... /other properties/
;

Now let’s assume we usually access yesterday data, so for convenience we decided to create view that contains only page views from yesterday partition:

CREATE VIEW yesterday_pageviews AS
SELECT * FROM pageviews
WHERE v_date = date_add(from_utc_timestamp(unix_timestamp()*1000,'UTC'),-1)

You can check that the expression used in where clause works as intended. It always returns the current date minus 1 day.

hive> SELECT date_add(from_utc_timestamp(unix_timestamp()*1000,'UTC'),-1);
OK
2015-06-16
Time taken: 0.422 seconds, Fetched: 1 row(s)

This view (yesterday_pageviews) returns correct records, but the problem is that it’s very slow. As you can see in the explain plan the amount of data suggest that the whole table is read instead of single partition.

> EXPLAIN select count(*) from yesterday_pageviews;
...
STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: test_table
            Statistics: Num rows: 505000 Data size: 50500000 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (v_date = date_add(Converting field (unix_timestamp() * 1000) from UTC to timezone: 'UTC', -1)) (type: boolean)
              Statistics: Num rows: 252500 Data size: 25250000 Basic stats: COMPLETE Column stats: NONE
              Select Operator

As you can see, where clause where moved down to filter operator, and ~50MB is the total table size:

$ hdfs dfs -du -s /apps/hive/warehouse/test_table/
50500000  /apps/hive/warehouse/test_table

Deterministic functions

The root cause for this problem is unix_timestamp() function which is declared as not deterministic. See the source on github:

@UDFType(deterministic = false)
@Description(name = "unix_timestamp",
 value = "_FUNC_([date[, pattern]]) - Returns the UNIX timestamp",
 extended = "Converts the current or specified time to number of seconds "
 + "since 1970-01-01.")
public class GenericUDFUnixTimeStamp extends GenericUDFToUnixTimeStamp {
...

Deterministic=true means that each call (within one query execution) should return the same value. Because unix_timestamp is not deterministic Hive cannot execute it in advance to get the partition key and it will try to filter row-by-row. This of course causes a full table scan.

Solution

The real problem is that prior to Hive 1.2 you don’t have many options. unix_timestamp() is the only function that gives the current date/time. You can try creating your own UDF defined as deterministic and it should work as expected.

In Hive 1.2 there are two new functions:

  • current_date
  • current_timestmap

and both are declared as deterministic, so they can be used in this scenario. In the explain section below you can see that only one partition is read:

hive> CREATE OR REPLACE VIEW yesterday_pageviews AS 
    > SELECT * FROM pageviews WHERE v_date=date_add(current_date(),-1);

...

hive> EXPLAIN SELECT count(*) FROM yesterday_pageviews;

...

STAGE PLANS:
  Stage: Stage-1
    Tez
      Edges:
        Reducer 2 <- Map 1 (SIMPLE_EDGE)
      DagName: root_20150617215157_ebaf02bb-0d27-4429-89fa-7f4292774c4b:1
      Vertices:
        Map 1 
            Map Operator Tree:
                TableScan
                  alias: test_table
                  filterExpr: (v_date = '2015-06-03') (type: boolean)
                  Statistics: Num rows: 1 Data size: 10100000 Basic stats: PARTIAL Column stats: NONE
                  Select Operator
                    Statistics: Num rows: 1 Data size: 10100000 Basic stats: PARTIAL Column stats: NONE
                    Group By Operator

It’s worth mentioning that unix_timestamp behaviour was described in HIVE-10728 issue and this function will become redefined as deterministic (and will be marked as deprecated) in the future releases.

Hive Virtual Columns

Like many other databases, Hive provides virtual columns – columns that are not stored in the datafiles but can be accessed in queries. Usually they provide some metadata that can be very handy.

In this post I will describe what virtual columns are available in Hive 1.0 and show how it works on several examples.

Sample data set

Let’s generate some data first. In our table we will store only one column that has one character and there will be 200,000,000 records. I wanted to have the records as shortest as possible. Our data file will occupy 400,000,000 bytes so that are 3 HDFS blocks.

The data was generated in the following way:

$ cat /dev/urandom | tr -cd '0-9a-zA-Z' | fold -b -w 1 | head -n 200000000 > one_char.txt

The file content looks like that:

$ head one_char.txt
c
U
9
Z
k
C
W
0
m
i

Now, let’s create few Hive tables with different formats. Basic table is defined in the following way:

CREATE TABLE jp_one_char(c string) 
ROW FORMAT DELIMITED;

After loading the file generated previously let’s populate the parquet table:

CREATE TABLE jp_one_char_parquet(c string) 
STOREAD AS PARQUET;

INSERT INTO TABLE jp_one_char_parquet 
SELECT * FROM jp_one_char;

ORC table:

CREATE TABLE jp_one_char_orc(c string) 
STORED AS ORC;

INSERT INTO TABLE jp_one_char_orc
SELECT * FROM jp_one_cha

and sequence file table:

CREATE TABLE jp_one_char_seq(c string) 
STORED AS SEQUENCEFILE;

INSERT INTO TABLE jp_one_char_seq
SELECT * FROM jp_one_char

Virtual columns

Please not that the virtual columns contains double underscore (“_ _”) as word separator in their names.

INPUT__FILE__NAME – this is pretty straight-forward. When displaying this column you will see the file on HDFS that contains the data. This can be useful, when debugging – you can find the file that contains some strange data, for example. But it may also be helpful in finding file location for external table or partition (otherwise you would have to examine table/partition properties: DESCRIBE EXTENDED … PARTITION(…).

BLOCK__OFFSET__INSIDE__FILE – according to documentation it should get the offset of the data block. It seems however that this behavior depends on the file format.

For example in text file (compressed or not) this columns display the offset of the record in the file.

> SELECT BLOCK__OFFSET__INSIDE__FILE, *
> FROM jp_one_char
> LIMIT 20;
+------------------------------+----------------+--+
| block__offset__inside__file  | jp_one_char.c  |
+------------------------------+----------------+--+
| 0                            | c              |
| 2                            | U              |
| 4                            | 9              |
| 6                            | Z              |
| 8                            | k              |
| 10                           | C              |
| 12                           | W              |
| 14                           | 0              |
| 16                           | m              |
| 18                           | i              |
| 20                           | 5              |
| 22                           | z              |
| 24                           | T              |
| 26                           | Z              |
| 28                           | P              |
| 30                           | 3              |
| 32                           | 2              |
| 34                           | s              |
| 36                           | 4              |
| 38                           | e              |
+------------------------------+----------------+--+

Each row occupies 2 bytes, which is correct (one char + new line). So, as you can see this column holds the offset of the row. Let’s make sure that there are no duplicates within the file:

> SELECT offset, count(*)
> from (
>   SELECT BLOCK__OFFSET__INSIDE__FILE offset, *
>   FROM jp_one_char
> ) t
> GROUP BY offset
> HAVING COUNT(*) >1;

...

+---------+------+--+
| offset  | _c1  |
+---------+------+--+
+---------+------+--+
No rows selected (693.016 seconds)

No duplicates, so this is unique within a file, as expected That’s nice, because we can use it to create row IDs.

But you have to be careful, as it depends on file format. In parquet, ORC and sequence file BLOCK__OFFSET__INSIDE__FILE is not unique within file and it’s hard to determine what it actually means.

ROW__OFFSET__INSIDE__BLOCK – To access this column you have to enable it first by setting:

set hive.exec.rowoffset=true;

but it always produced 0, anyway.

GROUPING__ID – It can be helpful when dealing with GROUP BY … WITH CUBE/ROLLUP queries. This column provides binary value of grouping set (0 if given grouping column is aggregated together, 1 if grouping column has value).

Let’s create another sample table:

> select * from jp_grouping_sample;
+------------------------+------------------------+------------------------+---------------------------+--+
| jp_grouping_sample.c1  | jp_grouping_sample.v1  | jp_grouping_sample.v2  | jp_grouping_sample.value  |
+------------------------+------------------------+------------------------+---------------------------+--+
| a                      | 1                      | 1                      | 1234                      |
| a                      | 2                      | 1                      | 404                       |
| a                      | 2                      | 1                      | 40                        |
| b                      | 1                      | 1                      | 0                         |
| b                      | 1                      | 2                      | 10                        |
+------------------------+------------------------+------------------------+---------------------------+--+
5 rows selected (0.768 seconds)

When we GROUP BY WITH CUBE, then we can use GROUPING__ID to have binary value of columns that were used:

> SELECT GROUPING__ID, c1, v1, v2,  count(*)
> FROM jp_grouping_sample
> GROUP BY c1, v1, v2 WITH CUBE;

+---------------+-------+-------+-------+------+--+
| grouping__id  |  c1   |  v1   |  v2   | _c4  |
+---------------+-------+-------+-------+------+--+
| 0             | NULL  | NULL  | NULL  | 5    |
| 4             | NULL  | NULL  | 1     | 4    |
| 4             | NULL  | NULL  | 2     | 1    |
| 2             | NULL  | 1     | NULL  | 3    |
| 6             | NULL  | 1     | 1     | 2    |
| 6             | NULL  | 1     | 2     | 1    |
| 2             | NULL  | 2     | NULL  | 2    |
| 6             | NULL  | 2     | 1     | 2    |
| 1             | a     | NULL  | NULL  | 3    |
| 5             | a     | NULL  | 1     | 3    |
| 3             | a     | 1     | NULL  | 1    |
| 7             | a     | 1     | 1     | 1    |
| 3             | a     | 2     | NULL  | 2    |
| 7             | a     | 2     | 1     | 2    |
| 1             | b     | NULL  | NULL  | 2    |
| 5             | b     | NULL  | 1     | 1    |
| 5             | b     | NULL  | 2     | 1    |
| 3             | b     | 1     | NULL  | 2    |
| 7             | b     | 1     | 1     | 1    |
| 7             | b     | 1     | 2     | 1    |
+---------------+-------+-------+-------+------+--+
20 rows selected (41.555 seconds)

For example, last row has 7 in GROUPING__ID, because all 3 grouping columns have some values (b,1,2) so 2^0+2^1+2^2 = 7.

When browsing Hive sources I noticed two other virtual columns, but it was hard to find any information on the web regarding them:

ROW__ID – This one is interesting. It gives NULL in all tables except for table in ORC format:

> SELECT ROW__ID, * FROM jp_one_char_orc LIMIT 5;
+---------------------------------------------+--------------------+--+
|                   row__id                   | jp_one_char_orc.c  |
+---------------------------------------------+--------------------+--+
| {"transactionid":0,"bucketid":0,"rowid":0}  | w                  |
| {"transactionid":0,"bucketid":0,"rowid":1}  | k                  |
| {"transactionid":0,"bucketid":0,"rowid":2}  | Y                  |
| {"transactionid":0,"bucketid":0,"rowid":3}  | 2                  |
| {"transactionid":0,"bucketid":0,"rowid":4}  | V                  |
+---------------------------------------------+--------------------+--+
5 rows selected (0.691 seconds)

In ORC table ROW__ID returns structure with three values: transactionid, bucketid and rowid. I couldn’t find much infomation on those values, but it seems that bucketid is number of HDFS block, and rowid is unique ID, but within a HDFS block.

RAW__DATA__SIZE – This one actually was not visible by Hive (1.0). Maybe still not implemented or some additional property has to be set?

References:

  • Hive Documentation on Virtual Columns;
  • Programming Hive, Edward Capriolo, Dean Vampler, Jason Rutherglen;
  • Apache Hive