This is the documentation for Cloudera Impala 1.2.4.
Documentation for other versions is available at Cloudera Documentation.

Partitioning

By default, all the data files for a table are located in a single directory. Partitioning is a technique for physically dividing the data during loading, based on values from one or more columns, to speed up queries that test those columns. For example, with a school_records table partitioned on a year column, there is a separate data directory for each different year value, and all the data for that year is stored in a data file in that directory. A query that includes a WHERE condition such as YEAR=1966, YEAR IN (1989,1999), or YEAR BETWEEN 1984 AND 1989 can examine only the data files from the appropriate directory or directories, greatly reducing the amount of data to read and test.

Partitioning is typically appropriate for:

  • Tables that are very large, where reading the entire data set takes an impractical amount of time.
  • Tables that are always or almost always queried with conditions on the partitioning columns. In our example of a table partitioned by year, SELECT COUNT(*) FROM school_records WHERE year = 1985 is efficient, only examining a small fraction of the data; but SELECT COUNT(*) FROM school_records has to process a separate data file for each year, resulting in more overall work than in an unpartitioned table. You would probably not partition this way if you frequently queried the table based on last name, student ID, and so on without testing the year.
  • Columns that have reasonable cardinality (number of different values). If a column only has a small number of values, for example Male or Female, you do not gain much efficiency by eliminating only about 50% of the data to read for each query. If a column has only a few rows matching each value, the number of directories to process can become a limiting factor, and the data file in each directory could be too small to take advantage of the Hadoop mechanism for transmitting data in multi-megabyte blocks. For example, you might partition census data by year, store sales data by year and month, and web traffic data by year, month, and day. (Some users with high volumes of incoming data might even partition down to the individual hour and minute.)
  • Data that already passes through an extract, transform, and load (ETL) pipeline. The values of the partitioning columns are stripped from the original data files and represented by directory names, so loading data into a partitioned table involves some sort of transformation or preprocessing.

Continue reading:

See Attaching an External Partitioned Table to an HDFS Directory Structure for an example that illustrates the syntax for creating partitioned tables, the underlying directory structure in HDFS, and how to attach a partitioned Impala external table to data files stored elsewhere in HDFS.

Parquet is a popular format for partitioned Impala tables because it is well suited to handle huge data volumes. See Query Performance for Impala Parquet Tables for performance considerations for partitioned Parquet tables.

See NULL for details about how NULL values are represented in partitioned tables.

SQL Statements for Partitioned Tables

In terms of Impala SQL syntax, partitioning affects these statements:

  • CREATE TABLE: you specify a PARTITIONED BY clause when creating the table to identify names and data types of the partitioning columns. These columns are not included in the main list of columns for the table.
  • ALTER TABLE: you can add or drop partitions, to work with different portions of a huge data set. With data partitioned by date values, you might "age out" data that is no longer relevant.
  • INSERT: When you insert data into a partitioned table, you identify the partitioning columns. One or more values from each inserted row are not stored in data files, but instead determine the directory where that row value is stored. You can also specify which partition to load a set of data into, with INSERT OVERWRITE statements; you can replace the contents of a specific partition but you cannot append data to a specific partition.
  • Although the syntax of the SELECT statement is the same whether or not the table is partitioned, the way queries interact with partitioned tables can have a dramatic impact on performance and scalability. The mechanism that lets queries skip certain partitions during a query is known as partition pruning; see Partition Pruning for Queries for details.

Partition Pruning for Queries

Partition pruning refers to the mechanism where a query can skip reading the data files corresponding to one or more partitions. If you can arrange for queries to prune large numbers of unnecessary partitions from the query execution plan, the queries use fewer resources and are thus proportionally faster and more scalable.

For example, if a table is partitioned by columns YEAR, MONTH, and DAY, then WHERE clauses such as WHERE year = 2013, WHERE year < 2010, or WHERE year BETWEEN 1995 AND 1998 allow Impala to skip the data files in all partitions outside the specified range. Likewise, WHERE year = 2013 AND month BETWEEN 1 AND 3 could prune even more partitions, reading the data files for only a portion of one year.

To check the effectiveness of partition pruning for a query, check the EXPLAIN output for the query before running it. For example, this example shows a table with 3 partitions, where the query only reads 1 of them. The notation #partitions=1/3 in the EXPLAIN plan confirms that Impala can do the appropriate partition pruning.

[localhost:21000] > insert into census partition (year=2010) values ('Smith'),('Jones');
[localhost:21000] > insert into census partition (year=2011) values ('Smith'),('Jones'),('Doe');
[localhost:21000] > insert into census partition (year=2012) values ('Smith'),('Doe');
[localhost:21000] > select name from census where year=2010;
+-------+
| name  |
+-------+
| Smith |
| Jones |
+-------+
[localhost:21000] > explain select name from census where year=2010;
+------------------------------------------------------------------+
| Explain String                                                   |
+------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                  |
|   PARTITION: UNPARTITIONED                                       |
|                                                                  |
|   1:EXCHANGE                                                     |
|                                                                  |
| PLAN FRAGMENT 1                                                  |
|   PARTITION: RANDOM                                              |
|                                                                  |
|   STREAM DATA SINK                                               |
|     EXCHANGE ID: 1                                               |
|     UNPARTITIONED                                                |
|                                                                  |
|   0:SCAN HDFS                                                    |
|      table=predicate_propagation.census #partitions=1/3 size=12B |
+------------------------------------------------------------------+

Impala can even do partition pruning in cases where the partition key column is not directly compared to a constant, by applying the transitive property to other parts of the WHERE clause. This technique is known as predicate propagation, and is available in Impala 1.2.2 and higher. In this example, the census table includes another column indicating when the data was collected, which happens in 10-year intervals. Even though the query does not compare the partition key column (YEAR) to a constant value, Impala can deduce that only the partition YEAR=2010 is required, and again only reads 1 out of 3 partitions.

[localhost:21000] > drop table census;
[localhost:21000] > create table census (name string, census_year int) partitioned by (year int);
[localhost:21000] > insert into census partition (year=2010) values ('Smith',2010),('Jones',2010);
[localhost:21000] > insert into census partition (year=2011) values ('Smith',2020),('Jones',2020),('Doe',2020);
[localhost:21000] > insert into census partition (year=2012) values ('Smith',2020),('Doe',2020);
[localhost:21000] > select name from census where year = census_year and census_year=2010;
+-------+
| name  |
+-------+
| Smith |
| Jones |
+-------+
[localhost:21000] > explain select name from census where year = census_year and census_year=2010;
+------------------------------------------------------------------+
| Explain String                                                   |
+------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                  |
|   PARTITION: UNPARTITIONED                                       |
|                                                                  |
|   1:EXCHANGE                                                     |
|                                                                  |
| PLAN FRAGMENT 1                                                  |
|   PARTITION: RANDOM                                              |
|                                                                  |
|   STREAM DATA SINK                                               |
|     EXCHANGE ID: 1                                               |
|     UNPARTITIONED                                                |
|                                                                  |
|   0:SCAN HDFS                                                    |
|      table=predicate_propagation.census #partitions=1/3 size=22B |
|      predicates: census_year = 2010, year = census_year          |
+------------------------------------------------------------------+

For a more detailed analysis of the volume of data that was actually read and processed, check the output of the PROFILE statement immediately after running the query.

Unresolved p tag.

Partition Key Columns

The columns you choose as the partition keys should be ones that are frequently used to filter query results in important, large-scale queries. Popular examples are some combination of year, month, and day when the data has associated time values, and geographic region when the data is associated with some place.

  • For time-based data, split out the separate parts into their own columns, because Impala cannot partition based on a TIMESTAMP column.
  • The data type of the partition columns does not have a significant effect on the storage required, because the values from those columns are not stored in the data files, rather they are represented as strings inside HDFS directory names.
  • Remember that when Impala queries data stored in HDFS, it is most efficient to use multi-megabyte files to take advantage of the HDFS block size. For Parquet tables, the block size (and ideal size of the data files) is 1GB. Therefore, avoid specifying too many partition key columns, which could result in individual partitions containing only small amounts of data. For example, if you receive 1GB of data per day, you might partition by year, month, and day; while if you receive 5GB of data per minute, you might partition by year, month, day, hour, and minute. If you have data with a geographic component, you might partition based on postal code if you have many megabytes of data for each postal code, but if not, you might partition by some larger region such as city, state, or country. state

Setting Different File Formats for Partitions

Partitioned tables have the flexibility to use different file formats for different partitions. For example, if you originally received data in text format, then received new data in RCFile format, and eventually began receiving data in Parquet format, all that data could reside in the same table for queries. You just need to ensure that the table is structured so that the data files that use different file formats reside in separate partitions.

For example, here is how you might switch from text to Parquet data as you receive data for different years:

[localhost:21000] > create table census (name string) partitioned by (year smallint);
[localhost:21000] > alter table census add partition (year=2012); -- Text format;

[localhost:21000] > alter table census add partition (year=2013); -- Text format switches to Parquet before data loaded;
[localhost:21000] > alter table census partition (year=2013) set fileformat parquet;

[localhost:21000] > insert into census partition (year=2012) values ('Smith'),('Jones'),('Lee'),('Singh');
[localhost:21000] > insert into census partition (year=2013) values ('Flores'),('Bogomolov'),('Cooper'),('Appiah');

At this point, the HDFS directory for year=2012 contains a text-format data file, while the HDFS directory for year=2013 contains a Parquet data file. As always, when loading non-trivial data, you would use INSERT ... SELECT or LOAD DATA to import data in large batches, rather than INSERT ... VALUES which produces small files that are inefficient for real-world queries.

For other file types that Impala cannot create natively, you can switch into Hive and issue the ALTER TABLE ... SET FILEFORMAT statements and INSERT or LOAD DATA statements there. After switching back to Impala, issue a REFRESH table_name statement so that Impala recognizes any partitions or new data added through Hive.