If you’re working in modern data analytics, you likely have some go-to tools. One of the most useful tools in a data analyst’s toolbox is the count-distinct function, which lets you count the number of unique values in a column in a data set. (Think of this as counting distinct elements in a data stream where there are repeated elements.) This can come in handy when gathering all types of business data—for example:
How many unique visitors came to our website last year
How many users got to a certain level of our online game today
How many different IP addresses did a slice of network traffic come from
How many unique visitors searched for a particular news event in a single day
How many unique IoT devices started to report error codes after the code rollout
However, as the number of unique items gets larger, the calculation to obtain this number exactly requires memory in proportion to the number of unique items. Another problem is that computing the values for unique visitors per day doesn’t mean you can simply add seven days of results together to get the unique count for the week, because that would overcount visitors seen on multiple days.
A simple solution to do an exact count-distinct is to create a set structure. Then, as you process elements in the input, add them to the set if they are not in it yet, while also incrementing a set size counter. That looks something like this:
The set will continue to grow in direct proportion to the cardinality of the dataset (the number of unique elements in the set), using large amounts of memory for large input cardinalities. You’ll have to consider the memory space needed for the set, as well as how to deal with this when using distributed processing, where data is spread across many machines for its processing. Count distinct functions from several machines cannot just be summed up, as the set objects from different machines will overlap (unless we repartitioned the data, such as with a hash partition). To deduplicate the sets in order to obtain an exact count, the entire leaf set needs to be sent to the root machine and merged (requiring huge amounts of I/O).
Performing count-distinct, faster and cheaper
So how do you avoid massive memory costs for count distinct, or make the computation feasible in the first place? Data analysts will often require only an approximate count, with a small error margin being acceptable. Allowing for a small error in the result opens the door for massively cheaper computations: Approximate algorithms can compute count distinct with a fraction of the memory and I/O of an exact solution, at the price of results which are, for example, within 0.5% of the exact solution.
A widely used approximate algorithm is HyperLogLog. Google’s implementation of and improvements to this algorithm are discussed in detail in this paper on HyperLogLog in practice. This implementation is known as HyperLogLog++ (we’ll refer to it as HLL++ for the rest of this post).
The HLL++ algorithm makes it possible to store the intermediate state of an aggregation in a very compact form, called a sketch. These sketches are constant in size, as opposed to growing linearly, as our earlier set objects. These sketches lend themselves well to distributed processing frameworks, since you can efficiently transfer aggregation states over the wire.
A further important benefit of sketches is that you can reuse and merge results for different time periods without needing to go back to the raw data. In our example counting unique visitors, a sketch can be produced and stored, summarizing the data for each day. To compute sliding window stats like seven-day active users, you can just reuse and merge the sketches for the relevant seven days instead of computing everything from scratch!
The Google implementation of HyperLogLog includes several improvements to the original algorithm: a compact and standardized sketch format, and a special higher accuracy mode for small cardinalities.
This implementation was added to BigQuery in 2017 and has recently been open sourced and made directly available in Apache Beam as of version 2.16. That means it’s available for use in Cloud Dataflow, our fully managed service for transforming and enriching data in stream (real-time) and batch (historical) modes with equal reliability and expressiveness.
Let’s explore the use of the HLL++ across several pipelines, using data coming from a streaming source:
Outputting the approximate count distinct directly
Output sketches to BigQuery, allowing for interoperability between BigQuery and Cloud Dataflow.
Using BigQuery to run analytics queries against the sketches stored in step 2
Outputting the sketches and metadata to Cloud Storage
Merging and extracting results from files stored on Cloud Storage
We’ll show you how to use the transform directly to output results as well as store the aggregated sketches in BigQuery and, along with metadata, as Avro files in Cloud Storage.
Building HLL++-enabled pipelines
Apache Beam lets you process and analyze data as a stream, so making use of the Beam API with real-time data is simply a matter of adding it to a pipeline.
The following operations, added in Beam 2.16, will reappear throughout the examples below:
HllCount.Init—aggregates data into a sketch;
HllCount.MergePartial—combines multiple sketches into one;
HllCount.Extract—Extracts the estimated count of distinct elements from a sketch.
Check out more details in the HllCount transform catalog entry. With these building blocks, we can now go ahead and explore a few use cases:
1. Compute approximate count of unique visitors to a website
Note: When testing, you can easily create a stream of values using the
GenerateSequence utility class in Beam, which generates a sequence of data for you in stream mode.
The pipeline below will process the IDs of everyone visiting our website and compute the approximate count based on the durationOfWindow.
2. Generate unique visitors per page count
The example above used a PCollection of anonymized IDs visiting our website, but what if we wanted a count based on specific pages? For that we can make use of the transforms’ ability to work with key-value pairs. In the example below, the key is the webpage identifier and the value is visitor IDs.
3. Storing the sketches in BigQuery
BigQuery supports HLL++ via the HLL_COUNT functions, and BigQuery’s sketches are fully compatible with Beam’s, so it’s easy to interoperate with sketch objects across both systems. This use case mirrors usage of HLL++ within Google with Flume and BigQuery.
Even when using approximate algorithms, Google-order-of-magnitude data sets can still be too large for analysts to query at interactive/"online" speeds (especially for expensive statistics like count distinct). The remedy is to pre-aggregate data into cubes using Google Flume pipelines, the internal version of Cloud Dataflow. End-user queries just filter and merge (roll up) over dozens of rows in the cube.
For functions like sum or count, the rollup is trivial, but what about count distinct? As you will recall from earlier, it is not possible to simply sum up count distinct values corresponding to a small time interval (day) into larger time intervals (week or month). With sketches, this becomes possible as the values can simply be merged.
In the example below we will:
Pre-aggregate data into sketches in Beam;
Store the sketches in BigQuery as byte columns along with some metadata about the time interval.
Run a rollup query in BigQuery, which can extract the results at interactive speed, thanks to the sketches that were pre-computed in Beam.
With the bytes stored in BigQuery we can use the BigQuery HLL_COUNT.* functions to merge the sketches and extract the counts, with the following SQL query:
Note: If you have sketches with different intervals in the same table, then you will need to use Window_Start and Window_End time in the WHERE clause.
4. Storing the sketches externally
Now, let’s output the results of the sketches to a file system. This is useful is when generating feature files ready for machine learning, where Cloud Dataflow is being used to enrich multiple sources of data before the modeling phase. The sketches’ ability to merge together means there’s no need to reprocess all of the data again. You just have to gather the files that correspond to the relevant time interval.
There are many Apache Beam SinkIOs that can be used to create and store files. We will explore three of the common ones: FileIO, TextIO, AvroIO.
TextIO will write out a PCollection<String> to a text file, with each element delimited by a new line character. Since a sketch is a byte array (byte), you need to first Base64-encode the bytes before writing them to TextIO. This is a nice straightforward approach which has a small processing overhead.
FileIO lets you create a file, then you need to write a custom writer/reader to output the byte into a file, including any mechanisms to deal with multiple sketch objects in a single file, like value separators.
Using a format like AVRO lets you store the sketches (as bytes) together with metadata. AVRO is also a widely used serialization format, with connectors available for many systems. Due to its convenience, we'll use AvroIO for this example.
First, create an object type that supports some extra metadata about the sketch. The @DefaultCoder annotation lets you use the AvroCoder coder in the pipeline.
Using this class, you can write a pipeline that turns our stream of key-value sketches into our custom HLLAvroContainer objects, ready for output to a file.
For extra optimization, let’s add the start and end time as metadata to the filename as well. This lets you easily use a glob pattern when reading the files to process only items you’re interested in, as opposed to having to read all files and apply a filter.
The code snippet below will generate files to GS_FOLDER with a file name format of
"2019-10-22T08:33:00.000Z-2019-10-22T16:33:00.000Z-pane-0-last-00000-of-00001". You can use AvroIO’s more advanced options to write your own filenames to act as additional metadata (not shown).
It’s also possible to push sketch objects from many windows into a single file. In order to do this, you would need to apply a larger window after the creation of the HLLAvroContainer but before the AvroIO write, such as Window.into(FixedWindows.of(<1 week>)).
5. Reading the sketches from external storage
With the output files in a folder, you can now read and then merge all of the individual files together from November 2019 for the approximate count distinct of that month.
This will result in an output line per key, since all the results have been merged into a single result.
HLL++ efficiently solves the count-distinct problem for large data sets with a small error margin, solving for the large majority of data analysts’ needs around count distinct. By its incorporation within Apache Beam, Cloud Dataflow offers you this algorithm for both streaming and batch processing pipelines. You can switch between the two easily, depending on your needs, and also move the intermediate aggregation state seamlessly back and forth between Beam and BigQuery.
Note: As of version 2.16, there are several implementations of approximate count algorithms. We recommend the use of HllCount.java, especially if you need sketches and/or need compatibility with Google Cloud BigQuery.
Among the other implementations, ApproximateUnique.java does not expose its intermediate aggregation state in the form of sketches and has lower accuracy; ApproximateDistinct.java is a reimplementation of the algorithm described in the HLL++ paper, but its sketch format is not compatible with BigQuery.
Source: Google Cloud Blog