Fitting Presto to Large-Scale Apache Kafka at Uber
The need for ad-hoc real-time data analysis has been growing at Uber. They run a large Apache Kafka deployment and need to analyze data going through the many workflows it supports. Solutions like stream processing and OLAP datastores were deemed unsuitable. An article was published recently detailing why Uber chose Presto for this purpose and what it had to do to make it performant at scale.
An example of a request that Uber’s Data team may receive is “is the order with UUID X missing in the Kafka topic T?”. Such a request might originate from a team investigating some sort of problem with the system and naturally involves analyzing real-time data from Kafka topics.
According to the article’s authors, Big Data real-time analytics often solves such problems. They evaluated open-source solutions in two categories: stream processing and real-time OLAP datastores.
Stream processing engines (eg Apache Flink, Apache Storm, ksql) continuously process data streams and output another stream with the processing results or update a view over the data. Consequently, data coming out of stream processing engines reflects the present state. Since data analysis requests such as the one in the example above involve looking at data from the past, stream processing engines are not a good fit.
The authors noted that real-time OLAP datastores such as Apache Pinot, Apache Druid and Clickhouse are better suited for this sort of request. They can index the Kafka streams and support low-latency queries. Pinot is even already used at Uber.
The disadvantage of real-time OLAP datastores is that it is necessary to create a table to store data ingested from the Kafka stream and then tune it for performance. On top of that, their storage and computation requirements are non-negligible. Real-time OLAP datastores are thus good for queries that need to be executed repeatedly with low latency, not for ad-hoc requests.
Uber already runs a mature Presto deployment. Presto does not require any upfront preparation since Kafka topics can be queried as soon as they are created and can correlate data between Kafka and other data sources. Hence, they decided to use Presto to enable ad-hoc queries over real-time data.
In comparison with real-time OLAP stores, Presto is less performant because it lacks indexing out of the box. Additionally, in Uber’s dynamic Kafka landscape, it needs to support dynamic Kafka topic and data schema discovery. Limiting query data sizes and consumption rates is also necessary to maintain Uber’s Kafka clusters healthy and ensure a good user experience.
The authors also describe how they modified Presto’s Kafka connector to address some of these problems.
To dynamically refresh cluster and topic metadata and data schemas, the team implemented a new strategy to fetch them from Uber’s Kafka cluster management service and schema registry at runtime.
The amount of data a query returns is limited by requiring the presence of the timestamp or partition offset fields in all queries. New column filters that reject queries without those fields enforce this requirement.
The final concern, related to Presto’s record consumption rate, is addressed by defining a quota in the Kafka broker in combination with a new Presto Kafka connector configuration item that allows setting a Kafka consumer client ID. Setting the same client ID for all Presto workers makes them subject to the same Kafka broker quota pool.
Limiting the record consumption rate also means that competitor Presto queries take longer to complete, which is a compromise that the team was happy to accept. In practice, the authors found that most queries complete within a reasonable time.
The authors note that the experience of Uber’s engineers has improved after rolling out this support for ad-hoc queries over real-time data: they now can issue SQL queries and often obtain the results within seconds.
Uber’s improvements to Presto are meant to be returned to the community. The authors also describe this implementation in their Presto on Kafka at Scale talk.