Hive partition filtering

Some time ago I found one pitfall related to partition filtering and user defined functions.

Let’s assume we have partitioned table that looks like this:

CREATE TABLE pageviews(
... 
) PARTITIONED BY (v_date STRING)
... /other properties/
;

Now let’s assume we usually access yesterday data, so for convenience we decided to create view that contains only page views from yesterday partition:

CREATE VIEW yesterday_pageviews AS
SELECT * FROM pageviews
WHERE v_date = date_add(from_utc_timestamp(unix_timestamp()*1000,'UTC'),-1)

You can check that the expression used in where clause works as intended. It always returns the current date minus 1 day.

hive> SELECT date_add(from_utc_timestamp(unix_timestamp()*1000,'UTC'),-1);
OK
2015-06-16
Time taken: 0.422 seconds, Fetched: 1 row(s)

This view (yesterday_pageviews) returns correct records, but the problem is that it’s very slow. As you can see in the explain plan the amount of data suggest that the whole table is read instead of single partition.

> EXPLAIN select count(*) from yesterday_pageviews;
...
STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: test_table
            Statistics: Num rows: 505000 Data size: 50500000 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (v_date = date_add(Converting field (unix_timestamp() * 1000) from UTC to timezone: 'UTC', -1)) (type: boolean)
              Statistics: Num rows: 252500 Data size: 25250000 Basic stats: COMPLETE Column stats: NONE
              Select Operator

As you can see, where clause where moved down to filter operator, and ~50MB is the total table size:

$ hdfs dfs -du -s /apps/hive/warehouse/test_table/
50500000  /apps/hive/warehouse/test_table

Deterministic functions

The root cause for this problem is unix_timestamp() function which is declared as not deterministic. See the source on github:

@UDFType(deterministic = false)
@Description(name = "unix_timestamp",
 value = "_FUNC_([date[, pattern]]) - Returns the UNIX timestamp",
 extended = "Converts the current or specified time to number of seconds "
 + "since 1970-01-01.")
public class GenericUDFUnixTimeStamp extends GenericUDFToUnixTimeStamp {
...

Deterministic=true means that each call (within one query execution) should return the same value. Because unix_timestamp is not deterministic Hive cannot execute it in advance to get the partition key and it will try to filter row-by-row. This of course causes a full table scan.

Solution

The real problem is that prior to Hive 1.2 you don’t have many options. unix_timestamp() is the only function that gives the current date/time. You can try creating your own UDF defined as deterministic and it should work as expected.

In Hive 1.2 there are two new functions:

  • current_date
  • current_timestmap

and both are declared as deterministic, so they can be used in this scenario. In the explain section below you can see that only one partition is read:

hive> CREATE OR REPLACE VIEW yesterday_pageviews AS 
    > SELECT * FROM pageviews WHERE v_date=date_add(current_date(),-1);

...

hive> EXPLAIN SELECT count(*) FROM yesterday_pageviews;

...

STAGE PLANS:
  Stage: Stage-1
    Tez
      Edges:
        Reducer 2 <- Map 1 (SIMPLE_EDGE)
      DagName: root_20150617215157_ebaf02bb-0d27-4429-89fa-7f4292774c4b:1
      Vertices:
        Map 1 
            Map Operator Tree:
                TableScan
                  alias: test_table
                  filterExpr: (v_date = '2015-06-03') (type: boolean)
                  Statistics: Num rows: 1 Data size: 10100000 Basic stats: PARTIAL Column stats: NONE
                  Select Operator
                    Statistics: Num rows: 1 Data size: 10100000 Basic stats: PARTIAL Column stats: NONE
                    Group By Operator

It’s worth mentioning that unix_timestamp behaviour was described in HIVE-10728 issue and this function will become redefined as deterministic (and will be marked as deprecated) in the future releases.

Hive Virtual Columns

Like many other databases, Hive provides virtual columns – columns that are not stored in the datafiles but can be accessed in queries. Usually they provide some metadata that can be very handy.

In this post I will describe what virtual columns are available in Hive 1.0 and show how it works on several examples.

Sample data set

Let’s generate some data first. In our table we will store only one column that has one character and there will be 200,000,000 records. I wanted to have the records as shortest as possible. Our data file will occupy 400,000,000 bytes so that are 3 HDFS blocks.

The data was generated in the following way:

$ cat /dev/urandom | tr -cd '0-9a-zA-Z' | fold -b -w 1 | head -n 200000000 > one_char.txt

The file content looks like that:

$ head one_char.txt
c
U
9
Z
k
C
W
0
m
i

Now, let’s create few Hive tables with different formats. Basic table is defined in the following way:

CREATE TABLE jp_one_char(c string) 
ROW FORMAT DELIMITED;

After loading the file generated previously let’s populate the parquet table:

CREATE TABLE jp_one_char_parquet(c string) 
STOREAD AS PARQUET;

INSERT INTO TABLE jp_one_char_parquet 
SELECT * FROM jp_one_char;

ORC table:

CREATE TABLE jp_one_char_orc(c string) 
STORED AS ORC;

INSERT INTO TABLE jp_one_char_orc
SELECT * FROM jp_one_cha

and sequence file table:

CREATE TABLE jp_one_char_seq(c string) 
STORED AS SEQUENCEFILE;

INSERT INTO TABLE jp_one_char_seq
SELECT * FROM jp_one_char

Virtual columns

Please not that the virtual columns contains double underscore (“_ _”) as word separator in their names.

INPUT__FILE__NAME – this is pretty straight-forward. When displaying this column you will see the file on HDFS that contains the data. This can be useful, when debugging – you can find the file that contains some strange data, for example. But it may also be helpful in finding file location for external table or partition (otherwise you would have to examine table/partition properties: DESCRIBE EXTENDED … PARTITION(…).

BLOCK__OFFSET__INSIDE__FILE – according to documentation it should get the offset of the data block. It seems however that this behavior depends on the file format.

For example in text file (compressed or not) this columns display the offset of the record in the file.

> SELECT BLOCK__OFFSET__INSIDE__FILE, *
> FROM jp_one_char
> LIMIT 20;
+------------------------------+----------------+--+
| block__offset__inside__file  | jp_one_char.c  |
+------------------------------+----------------+--+
| 0                            | c              |
| 2                            | U              |
| 4                            | 9              |
| 6                            | Z              |
| 8                            | k              |
| 10                           | C              |
| 12                           | W              |
| 14                           | 0              |
| 16                           | m              |
| 18                           | i              |
| 20                           | 5              |
| 22                           | z              |
| 24                           | T              |
| 26                           | Z              |
| 28                           | P              |
| 30                           | 3              |
| 32                           | 2              |
| 34                           | s              |
| 36                           | 4              |
| 38                           | e              |
+------------------------------+----------------+--+

Each row occupies 2 bytes, which is correct (one char + new line). So, as you can see this column holds the offset of the row. Let’s make sure that there are no duplicates within the file:

> SELECT offset, count(*)
> from (
>   SELECT BLOCK__OFFSET__INSIDE__FILE offset, *
>   FROM jp_one_char
> ) t
> GROUP BY offset
> HAVING COUNT(*) >1;

...

+---------+------+--+
| offset  | _c1  |
+---------+------+--+
+---------+------+--+
No rows selected (693.016 seconds)

No duplicates, so this is unique within a file, as expected That’s nice, because we can use it to create row IDs.

But you have to be careful, as it depends on file format. In parquet, ORC and sequence file BLOCK__OFFSET__INSIDE__FILE is not unique within file and it’s hard to determine what it actually means.

ROW__OFFSET__INSIDE__BLOCK – To access this column you have to enable it first by setting:

set hive.exec.rowoffset=true;

but it always produced 0, anyway.

GROUPING__ID – It can be helpful when dealing with GROUP BY … WITH CUBE/ROLLUP queries. This column provides binary value of grouping set (0 if given grouping column is aggregated together, 1 if grouping column has value).

Let’s create another sample table:

> select * from jp_grouping_sample;
+------------------------+------------------------+------------------------+---------------------------+--+
| jp_grouping_sample.c1  | jp_grouping_sample.v1  | jp_grouping_sample.v2  | jp_grouping_sample.value  |
+------------------------+------------------------+------------------------+---------------------------+--+
| a                      | 1                      | 1                      | 1234                      |
| a                      | 2                      | 1                      | 404                       |
| a                      | 2                      | 1                      | 40                        |
| b                      | 1                      | 1                      | 0                         |
| b                      | 1                      | 2                      | 10                        |
+------------------------+------------------------+------------------------+---------------------------+--+
5 rows selected (0.768 seconds)

When we GROUP BY WITH CUBE, then we can use GROUPING__ID to have binary value of columns that were used:

> SELECT GROUPING__ID, c1, v1, v2,  count(*)
> FROM jp_grouping_sample
> GROUP BY c1, v1, v2 WITH CUBE;

+---------------+-------+-------+-------+------+--+
| grouping__id  |  c1   |  v1   |  v2   | _c4  |
+---------------+-------+-------+-------+------+--+
| 0             | NULL  | NULL  | NULL  | 5    |
| 4             | NULL  | NULL  | 1     | 4    |
| 4             | NULL  | NULL  | 2     | 1    |
| 2             | NULL  | 1     | NULL  | 3    |
| 6             | NULL  | 1     | 1     | 2    |
| 6             | NULL  | 1     | 2     | 1    |
| 2             | NULL  | 2     | NULL  | 2    |
| 6             | NULL  | 2     | 1     | 2    |
| 1             | a     | NULL  | NULL  | 3    |
| 5             | a     | NULL  | 1     | 3    |
| 3             | a     | 1     | NULL  | 1    |
| 7             | a     | 1     | 1     | 1    |
| 3             | a     | 2     | NULL  | 2    |
| 7             | a     | 2     | 1     | 2    |
| 1             | b     | NULL  | NULL  | 2    |
| 5             | b     | NULL  | 1     | 1    |
| 5             | b     | NULL  | 2     | 1    |
| 3             | b     | 1     | NULL  | 2    |
| 7             | b     | 1     | 1     | 1    |
| 7             | b     | 1     | 2     | 1    |
+---------------+-------+-------+-------+------+--+
20 rows selected (41.555 seconds)

For example, last row has 7 in GROUPING__ID, because all 3 grouping columns have some values (b,1,2) so 2^0+2^1+2^2 = 7.

When browsing Hive sources I noticed two other virtual columns, but it was hard to find any information on the web regarding them:

ROW__ID – This one is interesting. It gives NULL in all tables except for table in ORC format:

> SELECT ROW__ID, * FROM jp_one_char_orc LIMIT 5;
+---------------------------------------------+--------------------+--+
|                   row__id                   | jp_one_char_orc.c  |
+---------------------------------------------+--------------------+--+
| {"transactionid":0,"bucketid":0,"rowid":0}  | w                  |
| {"transactionid":0,"bucketid":0,"rowid":1}  | k                  |
| {"transactionid":0,"bucketid":0,"rowid":2}  | Y                  |
| {"transactionid":0,"bucketid":0,"rowid":3}  | 2                  |
| {"transactionid":0,"bucketid":0,"rowid":4}  | V                  |
+---------------------------------------------+--------------------+--+
5 rows selected (0.691 seconds)

In ORC table ROW__ID returns structure with three values: transactionid, bucketid and rowid. I couldn’t find much infomation on those values, but it seems that bucketid is number of HDFS block, and rowid is unique ID, but within a HDFS block.

RAW__DATA__SIZE – This one actually was not visible by Hive (1.0). Maybe still not implemented or some additional property has to be set?

References:

  • Hive Documentation on Virtual Columns;
  • Programming Hive, Edward Capriolo, Dean Vampler, Jason Rutherglen;
  • Apache Hive

Odbieranie prawa jazdy

18 maja weszła w życia zmiana przepisów, która zaostrzyła kary za wykroczenia drogowe. Najdotkliwsze dla kierowców może być odebieranie prawa jazdy na 3 miesiące w przypadku przekroczenia dopuszczalnej prędkości w terenie zabudowanym o 50 km/h. W mediach były przytaczane opinie Policji wg których miało to bezpośredni wpływ na poprawę bezpieczeństwa na drogach.

Na stronach Policji są udostępniane statystyki, które zawierają m.in. dzienną liczbę wypadków samochodowych. Jest jeszcze trochę wcześnie, żeby oceniać długofalowy wpłych nowych środków karnych na bezpieczeństwo, ale wydaje się, że w pierwszych tygodniach po wprowadzeniu nowego przepisu rzeczywiście liczba wypadków nieco zmalała, a przynajmnniej nie urosła tak jak to miało w czerwcu w poprzednich latach.

Wypadki drogowe w latach 2013-2015

Podobne zmiany można zaobserwować w liczbi rannych oraz zabitych w wypadkach samochodowych. 8 czerwca był nawet pod tym względem dniem szczególnym, bo nie było ani jednej ofiary śmiertelnej, co nie zdarza się często.

W 2014 na drogach zginęło 2676 osób.

Dane ze strony http://www.statystyka.policja.pl/.

Wybory prezydenckie (cz. 2)

Jeszcze raz chciałem wrócić do ostatnich wyborów prezydenckich. Temat już stary – wiem, że muszę trochę popracować nad tempem przygotowywania kolejnych wpisów 🙂

Udostępnione dane

Od strony technicznej wyborów ucieszyło mnie to, że Państwowa Komisja Wyborcza upubliczniła wyniki w przystępnym formacie. Rezulataty z obwodowów wyborczych (a więc dane niezagregowane) zostały udostepnione jako pliki CSV. Wielkie brawa, oby tak dalej.

Mocne strony kandydatów

W mediach wielokrotnie pojawiały się mapy pokazujące poparcie danego kandydata w różnych powiatach i gmianch. Jest to niewątpliwie ciekawa obserwacja i ten fenomen jest widocznych chyba we wszystkich wyborach, ale jest w tym pewne uproszczenie.

Jeśli spojrzymy na to jak rozkładały się wyniki kadydatów w obwodach wyborczych, to można dostrzec, że są istotne różnice pomiędzy Dudą a Komorowskim.

prezydent2_rozklad

Przede wszystkim widać, że Andrzej Duda w wielu obwodowych komisjach cieszył się poparciem przekraczającym nawet 75%. Taka poplularność dla jego rywala była praktycznie nieosiągalna. Z drugiej strony było sporo obdodów, w których Bronisław Komorowski uzyskał niewielkie poparcie (<25%), a Andrzej Duda nawet jeśli przegrywał to i tak zdobywał pewną część głosów (najczęściej więcej niż 25%). Inaczej mówiąc lokalne zwyciąstwa Komorowskiego były umiarkowane i nawet w obwodach, w których urzędujący prezydent wygrał Duda cieszył się istotnym poparciem. Była to jedna z obserwacji, która mnie zaskoczyła.

Do podobnych wniosków można dojść, gdy spojrzymy na wyniki z poszczególnych województw:

Rozkład głosów w obwodowych komisjach w drugeij turze

Jasne jest, że najmocniejsze obwody Andrzeja Dudy znajdowaly się we wschodnich województwach, np. w lubelskim, podlaskim czy podkarpackim. Bronisław Komorowski najlepsze wyniki osiągał w zachodnich województwach (np. zachodniopomorskim), ale  nawet tam prawie w żadnych obwodach nie osiągnął przytłaczającej wygranej.

Co ciekawe, niektóre województwa cechowały się w miarę jednorodnym poparciem (takie samo w każdym obwodzie). Wspomniane wcześniej województwo zachodniopomorskie ma bardzo wąski rozkład głosów, czyli niemal w każdym obwodzie wynik był taki sam. Zupełnie inaczej było w np. w podkarpackim, które też miało zdecydowanego faworyta (Duda), ale rozkład wyników jest o wiele szerszy, co pokazuje, że sympatie politycznie nie były jednorodne w tym województwie.

Frekwencja

Druga obserwacja na którą zwróciłem uwagę dotyczy frekwencji. Bronisław Komorowski zdobył więcej głosów w obwodowych komisjach, w któych zanotowano większę frekwencję. Trudno mi jednoznacznie to zinterpretować. Może po prostu w miastach jest większa frekwencja a to obwody przynoszące zwycięstwo Komorowskiemu?

prezydent2_frekwencja

“Swing states”

Pomiędzy pierwszą a druga turą nie było wielkich zmian jeśli chodzi o sympatie polityczne. Zazwyczaj w obwodach wygrywał ten sam kandydat, który był liderem w poprzedniej turze (z uwzględnieniem oczywiście tego, że np. elektorat Pawła Kukiza musiał się zdecydować na innego kandydata). Obwody, w których wygrywał Kukiz prawie dwa razy częściej przynosiły zwycięstwo Dudzie niż Komorowskiemu w ostatecznym głosowaniu.
Pomiędzy obwodami, w którcyh zwyciężali faworyci były drobne zmiany: ok. 5% obwodów zmieniło faworyta w drugiej turze i zmiana Komorowski -> Duda była prawie dwukrotnie częstsza niż Duda -> Komorowski.

Wikipedia

Na Wikipedii widać dużą przewagę Dudy podczas kulminacyjnego okresu. Zapewne część jego popularności brała się z tego, że po prostu był mniej znanym kandydatem i jego sukces dla wielu wyborców był niespodzianką. Warto zwrócić uwagę, że przed I turą ta różnica była dużo mniejsza, więc pewnie nie powinniśmy traktować Wikipediii jako wyborczej wyroczni.

prezydent2_wiki_pv