Timestamps in Parquet on Hadoop

One of main advantages of open source Hadoop environment is that we are free to choose different tools that will make up our Big Data platform. No matter what kind of software distribution you decide to use, most of the times you can freely customise it by adding extra frameworks or upgrading versions on your own. The free choice of products gives great flexibility but also can cause a lot of difficulties when orchestrating different parts together. In this post I’d like to share some of the problems with handling timestamp on Parquet files.
Timestamp is commonly used and supported data type. You can find it in most of the frameworks but it turns out that tools can store and interpret it quite differently which will end up in wrong results or even hours spent on debugging your data workflow.

Timestamp in Hive

Hive supports Timestamp since version 0.8. They are interpreted as timestamps in local time zone (so the actual value is stored in parquet file as UTC) [4]. When timestamps are read from the file server’s time zone is applied on the value to give local timestamp. Of course, such behaviour depends on the file format. Text file format don’t imply any conversions to UTC.

Timestamp in Impala

In Impala timestamps are saved in local time zones, which is different than in Hive. Because historically Impala-Hive operability was very important, there are some workarounds to make coexistence of these two frameworks possible. The following impalad start-up parameter will add proper handling for timestamps in Hive-generated parquet file:
convert_legacy_hive_parquet_utc_timestamps=true (default false) [2]
It is worth mentioning that parquet file metadata is used to determine if the file was created in Hive or not. parquet-tools meta <file> command is helpful to see the creator of the file.

There is also Hive option to allow reading Impala’s files. Parameter hive.parquet.timestamp.skip.conversion is by default set to true and it means that parquet files created in Impala won’t have time zone applied, because the timestamps are already saved in local time zone.

Timestamp in Spark

Spark-Hive interoperability is fine. Every time we read timestamp column we have correct timestamp. The problem begins when we read in Spark tables created in Impala. In such case Spark apply server timezone to file which already have local timestamps and as a result we get different timestamps.
The main problem is that Spark (up to the newest version 2.2.0) doesn’t provide any special handling for Impala parquet files. So every time we have any scripts in Impala that process data later used in Spark we need to stay aware of the problems. Keep in mind that there are various Impala and Hive parameters that can influence the timestamp adjustments.

Sqoop

Even more problems if we add Sqoop to the workflow. Sqoop stores timestamp in Parquet as INT64 which makes the imported parquet file incompatible with Hive and Impala. These two tools will return errors when reading sqoop’s parquet files with timestamps. The funny thing is that Spark will read such file correctly without problems.

Timestamp in Parquet

Parquet is one of the most popular columnar format the is supported by most of the processing engines available on Hadoop. Its data types include only BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE and BYTE_ARRAY[1]. Timestamps is defined as a logical type (TIMESTAMP_MILLIS, TIMESTAMP_MICROS) [5], but since Impala stores the timestamp up to nanosecond precision, it was decided to use INT96. Other frameworks followed Impala to use INT96, but time zone interpretation compatibility was somehow missed.

In SQL database

So, how is it done in SQL database?
In Oracle for example we have TIMESTAMP for storing timestamp without timezone information, but with defined precision, TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE (where timestamp are stored in DB time zone and converted to session time zone when returning to the client) [7].
In Postgres we have two options too: TIMESTAMP (without time zone information) and TIMESTAMP WITH TIME ZONE (which is stored as UTC and converted to local time zone after reading) [8].
It could be helpful to have such choice on Hadoop.

Conclusion

Because Hadoop is open ecosystem with multiple independently-developed components sometimes it’s possible to find areas where there are some incompatibilities between components. Handling timestamp is a good example of such problem. Although there are some workaround to this known issue, some cases are still quite hard to detect and overcome. It is also a good example of typical difficulties with complex and open environment when compared to product designed and developed by single vendor.

References

 

Working day vs. weekend page views

Two months ago Stack Overflow published interesting blog post on programming languages and weather people are more likely to ask question during week or on weekend. It gives some
overview of how widely languages are spread in business (week) and hobby (weekend) projects.
From their analysis we can see that for example T-SQL, PowerShell and Oracle are used
during week whereas Huskell, assembly and C during weekend.

On Wikipedia…

I was interested in checking the same using Wikipedia page views data. Of course with Wikipedia it will be a bit differently. When someone learns programming language
they don’t usually read about it on Wikipedia, but rather find tutorial or look for answers on Stack Overflow. In some cases however Wikipedia can be main source of knowledge, especially when someone looks for theoretical aspects of programming or technology.
I checked several articles from different categories: databases, programming and data science. I checked page views of English Wikipedia since September 2016. For each article I computed weekend to week ratio (average page views during weekend / average page views during working days).

Database

Database category shows something interesting. There is a difference between theoretical concepts, for example Slowly changing dimension article is more work-related than normalisation and normal form definitions. On the other end of the scale there is Blockchain that is the most ‘weekend’ page in this section.

Data science

In data science section, there is interesting observation. Deep learning itself and
various modern frameworks usually related to deep learning/neural networks are
much more weekend articles than older machine learning algorithms.

Programming

As mentioned above, reading about programming language on Wikipedia is not really
sign that the language is used in projects. More likely people will check some detail about
it when they hear that name for the first time. Nevertheless there are some interesting facts.
As in Stack Overflow report, Huskell seems to attract more people during weekends.
On the other hand, it’s has similar ratio as Java so probably this is not the best
indicator about how popular in business is given language.
Design patterns are more work-related than some theoretical articles related to
functional programming or internals (garbage collection or stack buffer overflow).

Surprisingly, Scala was seems to be more often read during working days than other
languages that I checked.

Hive – Selecting columns with regular expression

In Hive there is rather an unique feature that allows to select columns by
regular expression instead of using column by names.
It’s very useful when we need to select all columns except one. In most of the SQL databases we would have to specify all columns, but in Hive there is this feature that can save us typing.
Let’s say there is a people table with column name, age, city, country and created_at. To select all columns except created_at we can write:

set hive.support.quoted.identifiers=none;
select
    `(created_at)?+.+`
from people
limit 10;

This is equivalent to:

select
    name, age, city, county
from people
limit 10;

Please note that in Hive 0.13 or later you have to set hive.support.quoted.identifier to none.
I have never seen such functionality in others SQL databases.

References

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

Spark SQL

This is one of the Hive-specific features that are not available in Spark SQL.

Hadoop user name

Some time ago I was looking for this option:
Environmental variable HADOOP_USER_NAME lets you specify the username that will be used when connecting to Hadoop, for example to create new HDFS files or accessing existing data.
Let’s have a look at the short example:

[root@sandbox ~]# echo "New file" | hdfs dfs -put - /tmp/file_as_root
[root@sandbox ~]# export HADOOP_USER_NAME=hdfs
[root@sandbox ~]# echo "New file" | hdfs dfs -put - /tmp/file_as_hdfs
[root@sandbox ~]# hdfs dfs -ls /tmp/file_*
-rw-r--r--   3 hdfs hdfs        154 2016-05-21 08:20 /tmp/file_as_hdfs
-rw-r--r--   3 root hdfs        154 2016-05-21 08:19 /tmp/file_as_root

So the second (file_as_hdfs) is owned by hdfs user because that was the value of HADOOP_USER_NAME variable.
Of course it works only on Hadoop cluster without Kerberos, but still it’s very useful on test environment or on VM. You can act as many users without executing sudo commands all the time.

Hive gotchas – Order By

There is this one feature in Hive that I really hate: ORDER BY col_index.
Historically order by clause accepted only column aliases, like in the simple example below:

select id, name from people order by name;
+------------+--------------+--+
| people.id  | people.name  |
+------------+--------------+--+
| 5          | Jimmy        |
| 2          | John         |
| 1          | Kate         |
| 4          | Mike         |
| 3          | Sam          |
+------------+--------------+--+

In other relational databases it is possible to give not only column alias but also column index, It much simpler to say “column 3” rather than typing whole name or alias. This option was not supported in Hive at the beginning, but community noticed that and a ticket was created.
Since Hive 0.11.0 it is possible to order the result by column index as well, however there is a gotcha here. There is a property that enables this new option: hive.groupby.orderby.position.alias must be set to ‘true’. The problem is that by default it is set to ‘false’ and in that case you can still use numbers in order by clause, but they are interpreted literally (as numbers) not as column index, which is rather strange.
So for example in the any modern Hive version where you do something like that:

select id, name from people order by 2;
+------------+--------------+--+
| people.id  | people.name  |
+------------+--------------+--+
| 1          | Kate         |
| 2          | John         |
| 3          | Sam          |
| 4          | Mike         |
| 5          | Jimmy        |
+------------+--------------+--+

As you can see by default it was interpreted as “value 2”, not the “column number 2”. After enabling the option you can change how the order by works:

set hive.groupby.orderby.position.alias=true;
select id, name from people order by 2;

+-----+--------+--+
| id  |  name  |
+-----+--------+--+
| 5   | Jimmy  |
| 2   | John   |
| 1   | Kate   |
| 4   | Mike   |
| 3   | Sam    |
+-----+--------+--+

So this time after enabling option we can use column number to sort by name. The problem is that whenever you work in Hive you have to think if the hive.groupby.orderby.position.alias was enabled in current session or not. This makes rather impractical and limits the usage of this syntactical sugar. Moreover I cannot really see any use case for using order by <value>

References

Hive Order By – https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Hive table properties

It was a surprise for me to find those two properties – very useful when dealing with large amount of files generated in other systems. I couldn’t find it on Hive docs, but you can come across these settings on forums.
skip.header.line.count tells how many lines from the file should be skipped. Useful when you read CSV files and first file contains header with column names. It works with text file (ROW FORMAT DELIMITED FIELDS TERMINATED BY…) and with CSV Serde. There is also complementary settings that allows to skip footer lines: skip.footer.line.count. The problem is however that Spark does’t recognize those properties so be careful when you plan to read the table later via Spark HiveContex.
Speaking about Hive table properties the following setting may also be very useful.
serialization.null.format is another table property which define how NULL values are encoded in the text file. Here is an example of using “null” string as NULL marker:

CREATE TABLE table_null(
     s1 STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES('serialization.null.format'='null');

So whenever field will contain string “null” it will be interpreted as NULL value.

Field separator escape

One more useful property that can be used when dealing with text files is delim.escape. This property allows setting custom character that will be used to escpe separator in column values:

CREATE TABLE table_escape(
    s1 STRING,
    s2 STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES('escape.delim'='\\');

(We use backslash for escape delimiter, but it has to be escaped as in Java)
In such case the following data file:

aaaa,bbbb
aaa\,bbb,cc

will be interpreated as:

0: jdbc:hive2://localhost:10000> select * from table_escape
0: jdbc:hive2://localhost:10000> ;
+------------------+------------------+--+
| table_escape.s1  | table_escape.s2  |
+------------------+------------------+--+
| aaaa             | bbbb             |
| aaa,bbb          | cc               |
+------------------+------------------+--+

Custom line breaks

There is also syntax that allows to split records with some character other than new line:

CREATE TABLE table_lines(
    s1 STRING,
    s2 STRING
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '|';

However it is currently not supported (I tried on Hive 1.2.1):

Error: Error while compiling statement: FAILED: SemanticException 5:20 LINES TERMINATED BY only supports newline '\n' right now. Error encountered near token ''|'' (state=42000,code=40000)

Binary formats

Generally speaking, text files should be rather avoided on Hadoop, because binary columnar format usually give better performance. Nevertheless CSV or other plain text files can be found quite often as an input from external systems. In such cases it good to have different formatting options and easily start using Hive in existing ecosystem without too much hassle.
 
 

Apache Drill

Last week I spent some time playing with Apache Drill and I really liked it. It seems that tool had some significant development last year and the coverage of SQL functionality is wide enough to migrate from Hive (at least in many typical cases).
I had a chance to do a short introduction presentation which is available here:
Introduction to Apache Drill
It seems there are still some issues that may block introducing Drill into existing production environments.
First of all it doesn’t support YARN, so it may be a challenge to deploy it alongside existing Hadoop nodes. Missing Kerberos support may also be an issue for some mature production clusters. There are however tickets already created for both of the features, so maybe in the next versions we will enjoy those improvements.

Big Data Landscape

There are many projects that make big data possible. If you look at the popular Cloudera or Hortonworks distribution you can see number of tools and frameworks that are ready to fit into existing corporate ecosystem and provide insight into processed data.
This big data ecosystem evolves all the time and because it is in huge majority open source software everyone can participate in developing it. I took a quick look at the GitHub repositories of some Hadoop related projects and generated statistics like number of commits, added or removed lines. This give some picture of the project and the effort involved in making a tool more mature.
Screenshot showing number of commits to big data projects
The projects I choose was rather arbitrary and there are good reasons to go further and keep on adding next repositories. I tried to select those tools that are usually find in Hadoop deployment or can optinally fit into existing big data environments. I divided the projects into several groups:

  • general tools
  • SQL processing tools (+Pig)
  • processing frameworks or libraries
  • big table implementations (HBase, Cassandra, Accumulo)
  • web notebooks (Hue, Zeppelin)
  • integration tools (online or batch)

Of course, you should keep in mind that this is based only on current GitHub repository. Some of the projects were developed earlier in different repositories (for example Hive has much earlier history records than Hadoop itself). Besides that some of the tools were open sourced at some point of time, whereas other started as open software since the very beginning. Nevertheless it can give some overall feeling and easily spot the projects with more intensive development.
Have a look at the this page.

Custom HDFS block size

HDFS stores files split into block. By default blocks are 64MB, however often in production system you can much larger block (i.e. 128MB). This setting is configured by dfs.block.size (or dfs.blocksize) property, usually defined in hdfs-site.xml configuration file.

Custom HDFS block size

It may be surprising that the block size setting can be overridden when executing Hadoop application. For example, when creating new file you can specify different block size than the system-wide defaults.

$ hdfs dfs -D dfs.blocksize=10m -put file.txt /user/kuba/
$ hadoop fsck /user/kuba/file.txt
...
 Total blocks (validated): 19 (avg. block size 10313284 B)
...

Of course this applies not only to console HDFS tools. It’s perfectly OK to create table in Hive that will be loaded with data split into some custom-sized HDFS blocks:

hive> set dfs.blocksize=300m;
hive> create table test_table_small_block_size(<schema...>)
hive> select ... from other_tables;

Some limitations

dfs.blocksize must be a multiplication of dfs.bytes-per-checksum, which by default is set to 512 bytes.
There is a system wide minimal block size defined by dfs.namenode.fs-limits.min-block-size (by default 1048576) and all custom dfs blocks settings must be greater than this value.

References

https://hadoop.apache.org/docs/r2.6.2/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml
 

Oktoberfest

Oktoberfest is the largest beer and folk festival in the world. In 2013 6.7 million people visited that event. Looking at the total page views on Wikipedia it looks that people read about Oktoberfest mainly in German and English. Other languages especially from neighboring countries were much lower:
oktoberfest

Breweries

It may be surprising, but only beer brewed from Munich local breweries can be served during Oktoberfest. There are six breweries that conform all the criteria and among them Löwenbräu seems to be the most popular on Wikipedia (especially on English pages). Paulaner brewery has also high position in both languages and it has a significant peak of popularity an the beginning of Oktoberfest (along with Hacker-Pschorr).
oktoberfest_breweries

Other Oktoberfest-related readings

There are many other articles on Wikipedia that are linked from Oktoberfest page. Some of them (especially those related to the event) share common popularity patterns with the main article. Lederhosen – the leader breeches worn on Oktoberfest – is one of the examples. Besides that quite popular was Maß, the 1 liter beer mug:
oktoberfest_other
There is one more example of article that got many page views on the beginning of Oktoberfest. That’s Reinheitsgebot, so called beer purity law, regulations which limit the ingredients that can be used in beer:
oktoberfest_purity

Conclusions

One important remark in the end: hundreds or thousands of page views is rather small number when compared with 6.7 million visitors of Oktoberfest (2013), so any conclusions based only on Wikipedia data can be highly inaccurate.