Timestamps in Parquet on Hadoop

One of main advantages of open source Hadoop environment is that we are free to choose different tools that will make up our Big Data platform. No matter what kind of software distribution you decide to use, most of the times you can freely customise it by adding extra frameworks or upgrading versions on your own. The free choice of products gives great flexibility but also can cause a lot of difficulties when orchestrating different parts together. In this post I’d like to share some of the problems with handling timestamp on Parquet files.
Timestamp is commonly used and supported data type. You can find it in most of the frameworks but it turns out that tools can store and interpret it quite differently which will end up in wrong results or even hours spent on debugging your data workflow.

Timestamp in Hive

Hive supports Timestamp since version 0.8. They are interpreted as timestamps in local time zone (so the actual value is stored in parquet file as UTC) [4]. When timestamps are read from the file server’s time zone is applied on the value to give local timestamp. Of course, such behaviour depends on the file format. Text file format don’t imply any conversions to UTC.

Timestamp in Impala

In Impala timestamps are saved in local time zones, which is different than in Hive. Because historically Impala-Hive operability was very important, there are some workarounds to make coexistence of these two frameworks possible. The following impalad start-up parameter will add proper handling for timestamps in Hive-generated parquet file:
convert_legacy_hive_parquet_utc_timestamps=true (default false) [2]
It is worth mentioning that parquet file metadata is used to determine if the file was created in Hive or not. parquet-tools meta <file> command is helpful to see the creator of the file.

There is also Hive option to allow reading Impala’s files. Parameter hive.parquet.timestamp.skip.conversion is by default set to true and it means that parquet files created in Impala won’t have time zone applied, because the timestamps are already saved in local time zone.

Timestamp in Spark

Spark-Hive interoperability is fine. Every time we read timestamp column we have correct timestamp. The problem begins when we read in Spark tables created in Impala. In such case Spark apply server timezone to file which already have local timestamps and as a result we get different timestamps.
The main problem is that Spark (up to the newest version 2.2.0) doesn’t provide any special handling for Impala parquet files. So every time we have any scripts in Impala that process data later used in Spark we need to stay aware of the problems. Keep in mind that there are various Impala and Hive parameters that can influence the timestamp adjustments.


Even more problems if we add Sqoop to the workflow. Sqoop stores timestamp in Parquet as INT64 which makes the imported parquet file incompatible with Hive and Impala. These two tools will return errors when reading sqoop’s parquet files with timestamps. The funny thing is that Spark will read such file correctly without problems.

Timestamp in Parquet

Parquet is one of the most popular columnar format the is supported by most of the processing engines available on Hadoop. Its data types include only BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE and BYTE_ARRAY[1]. Timestamps is defined as a logical type (TIMESTAMP_MILLIS, TIMESTAMP_MICROS) [5], but since Impala stores the timestamp up to nanosecond precision, it was decided to use INT96. Other frameworks followed Impala to use INT96, but time zone interpretation compatibility was somehow missed.

In SQL database

So, how is it done in SQL database?
In Oracle for example we have TIMESTAMP for storing timestamp without timezone information, but with defined precision, TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE (where timestamp are stored in DB time zone and converted to session time zone when returning to the client) [7].
In Postgres we have two options too: TIMESTAMP (without time zone information) and TIMESTAMP WITH TIME ZONE (which is stored as UTC and converted to local time zone after reading) [8].
It could be helpful to have such choice on Hadoop.


Because Hadoop is open ecosystem with multiple independently-developed components sometimes it’s possible to find areas where there are some incompatibilities between components. Handling timestamp is a good example of such problem. Although there are some workaround to this known issue, some cases are still quite hard to detect and overcome. It is also a good example of typical difficulties with complex and open environment when compared to product designed and developed by single vendor.



Hive – Selecting columns with regular expression

In Hive there is rather an unique feature that allows to select columns by
regular expression instead of using column by names.
It’s very useful when we need to select all columns except one. In most of the SQL databases we would have to specify all columns, but in Hive there is this feature that can save us typing.
Let’s say there is a people table with column name, age, city, country and created_at. To select all columns except created_at we can write:

set hive.support.quoted.identifiers=none;
from people
limit 10;

This is equivalent to:

    name, age, city, county
from people
limit 10;

Please note that in Hive 0.13 or later you have to set hive.support.quoted.identifier to none.
I have never seen such functionality in others SQL databases.



Spark SQL

This is one of the Hive-specific features that are not available in Spark SQL.

Hive gotchas – Order By

There is this one feature in Hive that I really hate: ORDER BY col_index.
Historically order by clause accepted only column aliases, like in the simple example below:

select id, name from people order by name;
| people.id  | people.name  |
| 5          | Jimmy        |
| 2          | John         |
| 1          | Kate         |
| 4          | Mike         |
| 3          | Sam          |

In other relational databases it is possible to give not only column alias but also column index, It much simpler to say “column 3” rather than typing whole name or alias. This option was not supported in Hive at the beginning, but community noticed that and a ticket was created.
Since Hive 0.11.0 it is possible to order the result by column index as well, however there is a gotcha here. There is a property that enables this new option: hive.groupby.orderby.position.alias must be set to ‘true’. The problem is that by default it is set to ‘false’ and in that case you can still use numbers in order by clause, but they are interpreted literally (as numbers) not as column index, which is rather strange.
So for example in the any modern Hive version where you do something like that:

select id, name from people order by 2;
| people.id  | people.name  |
| 1          | Kate         |
| 2          | John         |
| 3          | Sam          |
| 4          | Mike         |
| 5          | Jimmy        |

As you can see by default it was interpreted as “value 2”, not the “column number 2”. After enabling the option you can change how the order by works:

set hive.groupby.orderby.position.alias=true;
select id, name from people order by 2;

| id  |  name  |
| 5   | Jimmy  |
| 2   | John   |
| 1   | Kate   |
| 4   | Mike   |
| 3   | Sam    |

So this time after enabling option we can use column number to sort by name. The problem is that whenever you work in Hive you have to think if the hive.groupby.orderby.position.alias was enabled in current session or not. This makes rather impractical and limits the usage of this syntactical sugar. Moreover I cannot really see any use case for using order by <value>


Hive Order By – https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Hive table properties

It was a surprise for me to find those two properties – very useful when dealing with large amount of files generated in other systems. I couldn’t find it on Hive docs, but you can come across these settings on forums.
skip.header.line.count tells how many lines from the file should be skipped. Useful when you read CSV files and first file contains header with column names. It works with text file (ROW FORMAT DELIMITED FIELDS TERMINATED BY…) and with CSV Serde. There is also complementary settings that allows to skip footer lines: skip.footer.line.count. The problem is however that Spark does’t recognize those properties so be careful when you plan to read the table later via Spark HiveContex.
Speaking about Hive table properties the following setting may also be very useful.
serialization.null.format is another table property which define how NULL values are encoded in the text file. Here is an example of using “null” string as NULL marker:

CREATE TABLE table_null(
     s1 STRING

So whenever field will contain string “null” it will be interpreted as NULL value.

Field separator escape

One more useful property that can be used when dealing with text files is delim.escape. This property allows setting custom character that will be used to escpe separator in column values:

CREATE TABLE table_escape(
    s1 STRING,
    s2 STRING

(We use backslash for escape delimiter, but it has to be escaped as in Java)
In such case the following data file:


will be interpreated as:

0: jdbc:hive2://localhost:10000> select * from table_escape
0: jdbc:hive2://localhost:10000> ;
| table_escape.s1  | table_escape.s2  |
| aaaa             | bbbb             |
| aaa,bbb          | cc               |

Custom line breaks

There is also syntax that allows to split records with some character other than new line:

CREATE TABLE table_lines(
    s1 STRING,
    s2 STRING

However it is currently not supported (I tried on Hive 1.2.1):

Error: Error while compiling statement: FAILED: SemanticException 5:20 LINES TERMINATED BY only supports newline '\n' right now. Error encountered near token ''|'' (state=42000,code=40000)

Binary formats

Generally speaking, text files should be rather avoided on Hadoop, because binary columnar format usually give better performance. Nevertheless CSV or other plain text files can be found quite often as an input from external systems. In such cases it good to have different formatting options and easily start using Hive in existing ecosystem without too much hassle.

Hive partition filtering

Some time ago I found one pitfall related to partition filtering and user defined functions.
Let’s assume we have partitioned table that looks like this:

CREATE TABLE pageviews(
... /other properties/

Now let’s assume we usually access yesterday data, so for convenience we decided to create view that contains only page views from yesterday partition:

CREATE VIEW yesterday_pageviews AS
SELECT * FROM pageviews
WHERE v_date = date_add(from_utc_timestamp(unix_timestamp()*1000,'UTC'),-1)

You can check that the expression used in where clause works as intended. It always returns the current date minus 1 day.

hive> SELECT date_add(from_utc_timestamp(unix_timestamp()*1000,'UTC'),-1);
Time taken: 0.422 seconds, Fetched: 1 row(s)

This view (yesterday_pageviews) returns correct records, but the problem is that it’s very slow. As you can see in the explain plan the amount of data suggest that the whole table is read instead of single partition.

> EXPLAIN select count(*) from yesterday_pageviews;
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
            alias: test_table
            Statistics: Num rows: 505000 Data size: 50500000 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (v_date = date_add(Converting field (unix_timestamp() * 1000) from UTC to timezone: 'UTC', -1)) (type: boolean)
              Statistics: Num rows: 252500 Data size: 25250000 Basic stats: COMPLETE Column stats: NONE
              Select Operator

As you can see, where clause where moved down to filter operator, and ~50MB is the total table size:

$ hdfs dfs -du -s /apps/hive/warehouse/test_table/
50500000  /apps/hive/warehouse/test_table

Deterministic functions

The root cause for this problem is unix_timestamp() function which is declared as not deterministic. See the source on github:

@UDFType(deterministic = false)
@Description(name = "unix_timestamp",
 value = "_FUNC_([date[, pattern]]) - Returns the UNIX timestamp",
 extended = "Converts the current or specified time to number of seconds "
 + "since 1970-01-01.")
public class GenericUDFUnixTimeStamp extends GenericUDFToUnixTimeStamp {

Deterministic=true means that each call (within one query execution) should return the same value. Because unix_timestamp is not deterministic Hive cannot execute it in advance to get the partition key and it will try to filter row-by-row. This of course causes a full table scan.


The real problem is that prior to Hive 1.2 you don’t have many options. unix_timestamp() is the only function that gives the current date/time. You can try creating your own UDF defined as deterministic and it should work as expected.
In Hive 1.2 there are two new functions:

  • current_date
  • current_timestmap

and both are declared as deterministic, so they can be used in this scenario. In the explain section below you can see that only one partition is read:

hive> CREATE OR REPLACE VIEW yesterday_pageviews AS
    > SELECT * FROM pageviews WHERE v_date=date_add(current_date(),-1);
hive> EXPLAIN SELECT count(*) FROM yesterday_pageviews;
  Stage: Stage-1
        Reducer 2 <- Map 1 (SIMPLE_EDGE)
      DagName: root_20150617215157_ebaf02bb-0d27-4429-89fa-7f4292774c4b:1
        Map 1
            Map Operator Tree:
                  alias: test_table
                  filterExpr: (v_date = '2015-06-03') (type: boolean)
                  Statistics: Num rows: 1 Data size: 10100000 Basic stats: PARTIAL Column stats: NONE
                  Select Operator
                    Statistics: Num rows: 1 Data size: 10100000 Basic stats: PARTIAL Column stats: NONE
                    Group By Operator

It’s worth mentioning that unix_timestamp behaviour was described in HIVE-10728 issue and this function will become redefined as deterministic (and will be marked as deprecated) in the future releases.

Hive Virtual Columns

Like many other databases, Hive provides virtual columns – columns that are not stored in the datafiles but can be accessed in queries. Usually they provide some metadata that can be very handy.
In this post I will describe what virtual columns are available in Hive 1.0 and show how it works on several examples.

Sample data set

Let’s generate some data first. In our table we will store only one column that has one character and there will be 200,000,000 records. I wanted to have the records as shortest as possible. Our data file will occupy 400,000,000 bytes so that are 3 HDFS blocks.
The data was generated in the following way:

$ cat /dev/urandom | tr -cd '0-9a-zA-Z' | fold -b -w 1 | head -n 200000000 > one_char.txt

The file content looks like that:

$ head one_char.txt

Now, let’s create few Hive tables with different formats. Basic table is defined in the following way:

CREATE TABLE jp_one_char(c string)

After loading the file generated previously let’s populate the parquet table:

CREATE TABLE jp_one_char_parquet(c string)
INSERT INTO TABLE jp_one_char_parquet
SELECT * FROM jp_one_char;

ORC table:

CREATE TABLE jp_one_char_orc(c string)
INSERT INTO TABLE jp_one_char_orc
SELECT * FROM jp_one_cha

and sequence file table:

CREATE TABLE jp_one_char_seq(c string)
INSERT INTO TABLE jp_one_char_seq
SELECT * FROM jp_one_char

Virtual columns

Please not that the virtual columns contains double underscore (“_ _”) as word separator in their names.
INPUT__FILE__NAME – this is pretty straight-forward. When displaying this column you will see the file on HDFS that contains the data. This can be useful, when debugging – you can find the file that contains some strange data, for example. But it may also be helpful in finding file location for external table or partition (otherwise you would have to examine table/partition properties: DESCRIBE EXTENDED … PARTITION(…).
BLOCK__OFFSET__INSIDE__FILE – according to documentation it should get the offset of the data block. It seems however that this behavior depends on the file format.
For example in text file (compressed or not) this columns display the offset of the record in the file.

> FROM jp_one_char
> LIMIT 20;
| block__offset__inside__file  | jp_one_char.c  |
| 0                            | c              |
| 2                            | U              |
| 4                            | 9              |
| 6                            | Z              |
| 8                            | k              |
| 10                           | C              |
| 12                           | W              |
| 14                           | 0              |
| 16                           | m              |
| 18                           | i              |
| 20                           | 5              |
| 22                           | z              |
| 24                           | T              |
| 26                           | Z              |
| 28                           | P              |
| 30                           | 3              |
| 32                           | 2              |
| 34                           | s              |
| 36                           | 4              |
| 38                           | e              |

Each row occupies 2 bytes, which is correct (one char + new line). So, as you can see this column holds the offset of the row. Let’s make sure that there are no duplicates within the file:

> SELECT offset, count(*)
> from (
>   FROM jp_one_char
> ) t
> GROUP BY offset
| offset  | _c1  |
No rows selected (693.016 seconds)

No duplicates, so this is unique within a file, as expected That’s nice, because we can use it to create row IDs.
But you have to be careful, as it depends on file format. In parquet, ORC and sequence file BLOCK__OFFSET__INSIDE__FILE is not unique within file and it’s hard to determine what it actually means.
ROW__OFFSET__INSIDE__BLOCK – To access this column you have to enable it first by setting:

set hive.exec.rowoffset=true;

but it always produced 0, anyway.
GROUPING__ID – It can be helpful when dealing with GROUP BY … WITH CUBE/ROLLUP queries. This column provides binary value of grouping set (0 if given grouping column is aggregated together, 1 if grouping column has value).
Let’s create another sample table:

> select * from jp_grouping_sample;
| jp_grouping_sample.c1  | jp_grouping_sample.v1  | jp_grouping_sample.v2  | jp_grouping_sample.value  |
| a                      | 1                      | 1                      | 1234                      |
| a                      | 2                      | 1                      | 404                       |
| a                      | 2                      | 1                      | 40                        |
| b                      | 1                      | 1                      | 0                         |
| b                      | 1                      | 2                      | 10                        |
5 rows selected (0.768 seconds)

When we GROUP BY WITH CUBE, then we can use GROUPING__ID to have binary value of columns that were used:

> SELECT GROUPING__ID, c1, v1, v2,  count(*)
> FROM jp_grouping_sample
> GROUP BY c1, v1, v2 WITH CUBE;
| grouping__id  |  c1   |  v1   |  v2   | _c4  |
| 0             | NULL  | NULL  | NULL  | 5    |
| 4             | NULL  | NULL  | 1     | 4    |
| 4             | NULL  | NULL  | 2     | 1    |
| 2             | NULL  | 1     | NULL  | 3    |
| 6             | NULL  | 1     | 1     | 2    |
| 6             | NULL  | 1     | 2     | 1    |
| 2             | NULL  | 2     | NULL  | 2    |
| 6             | NULL  | 2     | 1     | 2    |
| 1             | a     | NULL  | NULL  | 3    |
| 5             | a     | NULL  | 1     | 3    |
| 3             | a     | 1     | NULL  | 1    |
| 7             | a     | 1     | 1     | 1    |
| 3             | a     | 2     | NULL  | 2    |
| 7             | a     | 2     | 1     | 2    |
| 1             | b     | NULL  | NULL  | 2    |
| 5             | b     | NULL  | 1     | 1    |
| 5             | b     | NULL  | 2     | 1    |
| 3             | b     | 1     | NULL  | 2    |
| 7             | b     | 1     | 1     | 1    |
| 7             | b     | 1     | 2     | 1    |
20 rows selected (41.555 seconds)

For example, last row has 7 in GROUPING__ID, because all 3 grouping columns have some values (b,1,2) so 2^0+2^1+2^2 = 7.
When browsing Hive sources I noticed two other virtual columns, but it was hard to find any information on the web regarding them:
ROW__ID – This one is interesting. It gives NULL in all tables except for table in ORC format:

> SELECT ROW__ID, * FROM jp_one_char_orc LIMIT 5;
|                   row__id                   | jp_one_char_orc.c  |
| {"transactionid":0,"bucketid":0,"rowid":0}  | w                  |
| {"transactionid":0,"bucketid":0,"rowid":1}  | k                  |
| {"transactionid":0,"bucketid":0,"rowid":2}  | Y                  |
| {"transactionid":0,"bucketid":0,"rowid":3}  | 2                  |
| {"transactionid":0,"bucketid":0,"rowid":4}  | V                  |
5 rows selected (0.691 seconds)

In ORC table ROW__ID returns structure with three values: transactionid, bucketid and rowid. I couldn’t find much infomation on those values, but it seems that bucketid is number of HDFS block, and rowid is unique ID, but within a HDFS block.
RAW__DATA__SIZE – This one actually was not visible by Hive (1.0). Maybe still not implemented or some additional property has to be set?


  • Hive Documentation on Virtual Columns;
  • Programming Hive, Edward Capriolo, Dean Vampler, Jason Rutherglen;
  • Apache Hive

Hive gotcha – schema in partitioned tables

I guess it can be a surprise for quite a few people that the schema of partitioned tables in Hive is defined also on partition level. Let’s have a look at this example.

Table schema

In Hive you can change the schema of an existing table. Let’s say you have a table:

create table sample_table (
 ts string,
 value int
row format delimited fields terminated by '\t';

We will focus on the second column named value. We load few records to this table. The file looks like this:

2015-01-01 10:01:23     140.23
2015-01-01 10:01:23     70.5
2015-01-01 10:01:23     123

The second column has some decimal values, but we have defined this column as integer, so the we won’t see the decimal part:

hive> select * from sample_table;
2015-01-01 10:01:23    140
2015-01-01 10:01:23    70
2015-01-01 10:01:23    123
Time taken: 0.103 seconds, Fetched: 3 row(s)

We have noticed our mistake, so let’s change the schema of the table by replacing int to decimal(10,2):

hive> alter table sample_table change value value decimal(10,2);
Time taken: 0.29 seconds

Now we can see the data, as expected:

hive> select * from sample_table;                               
2015-01-01 10:01:23    140.23
2015-01-01 10:01:23    70.5
2015-01-01 10:01:23    123
Time taken: 0.099 seconds, Fetched: 3 row(s)

So everything works fine, but it’s get more complicated with partitioned tables.

Partitioned tables

Let’s say our table has partitions:

create table partitioned_table(
 ts            string,
 value         int
) partitioned by ( v_part string)
row format delimited fields terminated by '\t';

Let’s load the same data as previously to partition part1:

hive -e "load data local inpath 'part1.txt' into table partitioned_table partition (v_part = 'part1');"

Selecting from the table shows int values — as previously:

hive> select * from partitioned_table;
2015-01-01 10:01:23    140    part1
2015-01-01 10:01:23    70     part1
2015-01-01 10:01:23    123    part1
Time taken: 0.145 seconds, Fetched: 3 row(s)

We have noticed our mistake and we change the schema of that table:

hive> alter table partitioned_table change value value decimal(10,2);
Time taken: 0.288 seconds

So far, so good:

hive> desc partitioned_table;
ts                      string                                      
value                   decimal(10,2)                               
v_part                  string                                      
# Partition Information         
# col_name                data_type               comment             
v_part                  string                                      
Time taken: 0.224 seconds, Fetched: 8 row(s)

However when we select that data, we get int values again (as if the schema didn’t change):

hive> select * from partitioned_table;
2015-01-01 10:01:23    140    part1
2015-01-01 10:01:23    70     part1
2015-01-01 10:01:23    123    part1
Time taken: 0.135 seconds, Fetched: 3 row(s)

Partition schema

This can be a bit counter-intuitive. The schema for partitioned table is defined also on partition level. So altering the schema will affect only newly created partitions. For existing partition old schema will be used (that one which was in use during creation of that partition).
Loading the same file to another partition will show the difference:

hive -e "load data local inpath 'part1.txt' into table partitioned_table partition (v_part = 'part2');"

And now we can see that second column (named value) in part1 partition is converted into int (part1) whereas in partition part2 is displayed with precision decimal(10,2):

hive> select * from partitioned_table;
2015-01-01 10:01:23    140     part1
2015-01-01 10:01:23    70      part1
2015-01-01 10:01:23    123     part1
2015-01-01 10:01:23    140.23  part2
2015-01-01 10:01:23    70.5    part2
2015-01-01 10:01:23    123     part2
Time taken: 0.131 seconds, Fetched: 6 row(s)

It doesn’t mean that the partition schemas are totally independent. For example having string column in partition and int on table-level will cause we get NULL for all those string values that couldn’t be parsed as integers. At the beginning the values are read and interpreted according to partition schema and then they are converted according to the table schema.

How to deal with it?

In Hive 0.14 there is a way to change the partition schema:

alter table ... partition(...) change colname colname type;

I didn’t have a chance to test it, but according to documentation it should work.
I haven’t found any way to change the partition schema in releases prior to 0.14 (in particular 0.13), so it seems the only way to fix it is to recreate the table.