Step 1 – Fixing Storage
TL; DR – Don’t use Avro unless you need to read almost all the columns almost all the time. Even then use ORC. Don’t create millions of partitions, keep them below 10,000 if you are using Hive tables. Don’t use Hive for Batch processing / regular report generation across any decent sized data, it’s primarily for ad-hoc querying.
In the previous post, we explained how the Hive + Avro setup in a highly normalized schema was not really working out with Hive Query engine blowing up with OOM (Out of Memory Exceptions), queries taking hours to run and then failing with random looking errors.
As we started to look into the issues we found that the partitioning scheme was causing major issues in Hive Metastore. Hive metadata is stored in a relational table and when there are a massive number of partitions the metastore starts to run into significant performance issues with non-indexed queries.
To give you an example, a query that does “select field_1,field_2,field_3 from table_1 limit 200” is not a bad query at first glance. All I am trying to do is to look at 200 sample rows for 3 fields from a single table. However, with a large number of partitions, this query can bring down your metastore since it loads information about all partitions into memory for the store. Now imagine running 5-10 of these in parallel and the Hive Query generation process blows up, bringing the entire Hive Query engine to a crashing Out of Memory.
Great .. so how do you solve this problem. Well, reduce the number of partitions! Now there are ways to keep the partitioning if you so like it and still prevent dangerous query execution using defensive dynamic proxy interceptions but let’s not go into that solution for this post. If interested drop us a line at firstname.lastname@example.org.
On to storage refactoring!
We first took the 12 table schema and reduced it to a de-normalized structure with 2 large tables that had truly disjoint information and 2 supporting metadata tiny tables. The large tables were for market_data and risk_measures. There were no join conditions in these two in reporting. The metadata tables were for instrument information and counter-party information.
We took the physical storage format and switched it to Parquet. We did this because the reports were accessing 10-20% of data in them with the maximum number of columns used in a single report was around 40%. With Avro being a row based format getting 10% of data out of a row still require the entire row to be read into memory and the unused 90% is discarded. Parquet supports a columnar read so if you are reading 10% of columns from a row, only those columns are loaded into memory. More details can be found here — https://parquet.apache.org/
We reduced the partition structure to have far fewer partitions. The partitioning was equivalent to a single trading desk having its own partition per day. So within US Equity trading there were different partitions for structured trading, volatility trading, cash trading, delta-1 trading and so on created per day. We reduced it down to 3 partitions – 1 for the US, 1 for EMEA, 1 for ROW (Rest of World) per year for historical data, 1 partition for the current month and 2 partitions for the previous day and current day. So we ended up with total of 3 (Regions) * ( 9 (years) + 12 (mo) + 2 (days))*4(tables) =276 partitions vs nearly 12(tables)*250(days/year)*80(desks)*10 (years) = 2,400,000 partitions!
The impact of these changes:
Took us about a week to slow roll the data into the new storage structure. We did historical data rewrite in the Backup site, performed our validation and then made the cut over on a weekend. All in all a 2-week effort from start to finish to change the storage structure.
To cut over the query structure was surprisingly easy. Hive allows you to user Avro or Parquet with minimal changes.
Hive OOM simply went away. No crashes, no GC thrashing .. humming along quietly.
Query times went down by 100% – all the loading and discarding churn was rather expensive. Also, the intermediate table generation across tiny sized partitions was replaced by bulk writes to large partitions – Hadoop excels at that.
In the next post – when 100% is not good enough! how to take it to the next level.