# Publishing Dropwizard Metrics to Kafka

This post is about combining Dropwizard metrics with Kafka to create self instrumenting applications producing durable streams of application metrics, which can be processed (and re-processed) in many ways. The solution is appealing because Kafka is increasingly popular, and therefore likely to be available infrastructure, and Dropwizard metrics likewise, being leveraged by many open source frameworks with many plugins for common measurements such as JVM and web application metrics.

#### DropWizard

Dropwizard metrics allows you to create application metrics as an aspect of your application quickly. An application instrumented  with Dropwizard consists of a MetricRegistry – basically an in memory key-value store of the state of named metrics – and one or more Reporters. There are several built in reporters including ConsoleReporter, CsvReporterGangliaReporter and GraphiteReporter (the Ganglia and Graphite reporters require that you are actually running these services). An unofficial reporter designed for Ambari Metrics is hosted here.  Nobody really wants to work with JMX anymore, but, just in case you’re working with prehistoric code, there is also a JMXReporter available out of the box. Reporters are very loosely coupled with instrumentation cut points throughout your code, so it’s very easy to change a reporting strategy. Instrumenting an application manually is extremely simple – you just can’t go wrong following the getting started page – and there are several annotation processing mechanisms for instrumenting methods; for instance there are numerous integrations to be found on Github for frameworks like Spring. Indeed, I wrote my own annotation binding using Guice type listeners on a recent project, which was certainly easy enough (using techniques in this post on type listeners).

#### Kafka

The only work that needs to be done is to extend the Reporter mechanism to use Kafka as a destination. Despite being fast, the real beauty of writing metrics to Kafka is that you can do what you want with them afterwards. If you want to replicate them real time onto ZeroMQ topics, you can do that just as easily as you can run Spark Streaming or a scheduled Spark Batch job over your application metrics. If you’re building your own monitoring dashboard, you could imagine having a real time latest value, along with hourly or daily aggregations. In fact you can process the metrics at whatever frequency you wish within Kafka’s retention period. I truly believe your application metrics belong in Kafka, at least in the short term.

#### Extending ScheduledReporter

The basic idea is to extend ScheduledReporter composing a KafkaProducer. ScheduledReporter is unsurprisingly invoked repeatedly at a specified rate. On invocation, the idea is to loop through all gauges, meters, timers, and so on, serialise them (there may be a performance boost available from CBOR), and send them to Kafka via the KafkaProducer on a configurable topic. Then wherever in your application you would have created, say, an Slf4jReporter, just create a KafkaReporter instead.

#### Code

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.6</version>
</dependency>

Whether you like them or not, all metrics reporters come with builders, so to be consistent you need to implement one. The builder needs to collect some details about Kafka so it knows where to send the metrics. The reporter is going to be responsible for creating a format in this example, but that can be factored out, in which case it would need to be exposed on the builder. In common with all reporters, there are configuration parameters relating to default units etc. which must be exposed for the sake of consistency.

public static class KafkaReporterBuilder {

private final MetricRegistry registry;
private final KafkaProducer<String, byte[]> producer;
private final String topic;
private String name = "KafkaReporter";
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
private TimeUnit rateUnit = TimeUnit.SECONDS;
private ObjectMapper mapper;

public KafkaReporterBuilder(MetricRegistry registry,
KafkaProducer<String, byte[]> producer,
String topic) {
this.registry = registry;
this.producer = producer;
this.topic = topic;
}

public KafkaReporterBuilder withName(String name) {
this.name = name;
return this;
}

public KafkaReporterBuilder withTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}

public KafkaReporterBuilder withRateUnit(TimeUnit rateUnit) {
this.rateUnit = rateUnit;
return this;
}

public KafkaReporterBuilder withMapper(ObjectMapper mapper) {
this.mapper = mapper;
return this;
}

public KafkaReporter build() {
return new KafkaReporter(registry,
name,
MetricFilter.ALL,
rateUnit,
timeUnit,
mapper == null ? new ObjectMapper() : mapper,
topic,
producer);
}
}

Here we will use the metric name as the key of the message, this is because we need all messages of the same metric to go to the same partition to guarantee chronological order. Here we take a KafkaProducer with String keys and byte[] values – the name will be the key, the serialised metric will be the value. It’s better for testability to defer the construction of the KafkaProducer to the caller, so the producer can be mocked, but KafkaProducers are really easy to construct from properties files, for instance see the Javadoc.

The next step is to implement the reporter.

public class KafkaReporter extends ScheduledReporter {

private final KafkaProducer<String, byte[]> producer;
private final ObjectMapper mapper;
private final String topic;

protected KafkaReporter(MetricRegistry registry,
String name,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit,
ObjectMapper mapper,
String topic,
KafkaProducer<String, byte[]> producer) {
super(registry, name, filter, rateUnit, durationUnit);
this.producer = producer;
this.mapper = mapper;
this.topic = topic;
}

@Override
public void report(SortedMap<String, Gauge> gauges,
SortedMap<String, Counter> counters,
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
report(gauges);
report(counters);
report(histograms);
report(meters);
report(timers);
}

private void report(SortedMap<String, ?> metrics) {
metrics.entrySet()
.stream()
.map(kv -> toRecord(kv.getKey(), kv.getValue(), this::serialise))
.forEach(producer::send);
}

private <T> ProducerRecord<String, byte[]> toRecord(String metricName, T metric, Function<T, byte[]> serialiser) {
return new ProducerRecord<>(topic, metricName, serialiser.apply(metric));
}

private byte[] serialise(Object value) {
try {
return mapper.writeValueAsBytes(value);
} catch(JsonProcessingException e) {
throw new RuntimeException("Value not serialisable: " + value, e);
}
}
}

To use it to publish all application metrics to Kafka in CBOR format, once every five seconds:

MetricRegistry registry = ...
Properties kafkaProperties = ...
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);
KafkaReporter reporter = new KafkaReporter.KafkaReporterBuilder(registry, producer, "topic")
.withMapper(new ObjectMapper(new CBORFactory()))
.build();
reporter.start(5, TimeUnit.SECONDS);
...
reporter.stop();

# A Quick Look at RoaringBitmap

This article is an introduction to the data structures found in the RoaringBitmap library, which I have been making extensive use of recently. I wrote some time ago about the basic idea of bitmap indices, which are used in various databases and search engines, with the caveat that no traditional implementation is optimal across all data scenarios (in terms of size of the data set, sparsity, cardinalities of attributes and global sort orders of data sets with respect to specific attributes). RoaringBitmap is a dynamic data structure which aims to be that one-size-fits-all solution across all scenarios.

#### Containers

A RoaringBitmap should be thought of as a set of unsigned integers, consisting of containers which cover disjoint subsets. Each subset can contain values from a range of size $2^{16}-1$, and the subset is indexed by a 16 bit key. This means that in the worst case it only takes 16 bits to represent a single 32 bit value, so unsigned 32 bit integers can be stored as Java shorts. The choice of container size also means that in the worst case, the container will still fit in L1 cache on a modern processor.

The implementation of the container covering a disjoint subset is free to vary between RunContainer, BitmapContainer and ArrayContainer, depending entirely on properties of the subset. When inserting data into a RoaringBitmap, it is decided whether to create a new container, or to mutate an existing container, depending on whether the values fit in the range covered by the container’s key. When performing a set operation, for instance by intersecting two bitmaps or computing their symmetric difference, a new RoaringBitmap is created by performing operations container by container, and it is decided dynamically which container implementation is best suited for the result. For cases where it is too difficult to determine the best implementation automatically, the method runOptimize is available to the programmer to make sure.

When querying a RoaringBitmap, the query can be executed container by container (which incidentally makes the query naturally parallelisable, but it hasn’t been done yet), and each pair from the cartesian product of combinations of container implementations must be implemented separately. This is manageable because there are only three implementations, and there won’t be any more. There is less work to do for symmetric operations, such as union and intersection, than with asymmetric operations such as contains.

#### RunContainer

When there are lots of clean words in a section of a bitmap, the best choice of container is run length encoding. The implementation of RunContainer is simple and very compact. It consists of an array of shorts (not ints, the most significant 16 bits are in the key) where the values at the even indices are the starts of runs, and the values at the odd indices are the lengths of the respective runs. Membership queries can be implemented simply using a binary search, and quantile queries can be implemented in constant time. Computing container cardinality requires a pass over the entire run array.

#### ArrayContainer

When data is sparse within a section of the bitmap, the best implementation is an array (short[]).  For very sparse data, this isn’t theoretically optimal, but for most cases it is very good and the array for the container will fit in L1 cache for mechanical sympathy. Cardinality is very fast because it is precomputed, and operations would be fast in spite of their precise implementation by virtue of the small size of the set (that being said, the actual implementations are fast). Often when creating a new container, it is necessary to convert to a bitmap for better compression as the container fills up.

#### BitmapContainer

BitmapContainer is the classic implementation of a bitset. There is a fixed length long[] which should be interpreted bitwise, and a precomputed cardinality. Operations on BitmapContainers tend to be very fast, despite typically touching each element in the array, because they fit in L1 cache and make extensive use of Java intrinsics. If you find a method name in here and run your JVM on a reasonably modern processor, your code will quickly get optimised by the JVM, sometimes even to a single instruction. A much hackneyed example, explained better by Michael Barker quite some time ago, would be Long.bitCount, which translates to the single instruction popcnt and has various uses when operating on BitmapContainers. When intersecting with another container, the cardinality can only decrease or remain the same, so there is a chance an ArrayContainer will be produced.

#### Examples

There is a really nice Scala project on github which functions as a DSL for creating RoaringBitmaps – it allows you to create an equality encoded (see my previous bitmap index post) RoaringBitmap in a very fluid way. The project is here.

I have implemented bit slice indices, both equality and range encoded, in a data quality tool I am building. That project is hosted here. Below is an implementation of a range encoded bit slice index as an example of how to work with RoaringBitmaps.

public class RangeEncodedOptBitSliceIndex implements RoaringIndex {

private final int[] basis;
private final int[] cumulativeBasis;
private final RoaringBitmap[][] bitslice;

public RangeEncodedOptBitSliceIndex(ProjectionIndex projectionIndex, int[] basis) {
this.basis = basis;
this.cumulativeBasis = accumulateBasis(basis);
this.bitslice = BitSlices.createRangeEncodedBitSlice(projectionIndex, basis);
}

@Override
public RoaringBitmap whereEqual(int code, RoaringBitmap existence) {
RoaringBitmap result = existence.clone();
int[] expansion = expand(code, cumulativeBasis);
for(int i = 0; i < cumulativeBasis.length; ++i) {
int component = expansion[i];
if(component == 0) {
result.and(bitslice[i][0]);
}
else if(component == basis[i] - 1) {
result.andNot(bitslice[i][basis[i] - 2]);
}
else {
result.and(FastAggregation.xor(bitslice[i][component], bitslice[i][component - 1]));
}
}
return result;
}

@Override
public RoaringBitmap whereNotEqual(int code, RoaringBitmap existence) {
RoaringBitmap inequality = existence.clone();
inequality.andNot(whereEqual(code, existence));
return inequality;
}

@Override
public RoaringBitmap whereLessThan(int code, RoaringBitmap existence) {
return whereLessThanOrEqual(code - 1, existence);
}

@Override
public RoaringBitmap whereLessThanOrEqual(int code, RoaringBitmap existence) {
final int[] expansion = expand(code, cumulativeBasis);
final int firstIndex = cumulativeBasis.length - 1;
int component = expansion[firstIndex];
int threshold = basis[firstIndex] - 1;
RoaringBitmap result = component < threshold ? bitslice[firstIndex][component].clone() : existence.clone();     for(int i = firstIndex - 1; i >= 0; --i) {
component = expansion[i];
threshold = basis[i] - 1;
if(component != threshold) {
result.and(bitslice[i][component]);
}
if(component != 0) {
result.or(bitslice[i][component - 1]);
}
}
return result;
}

@Override
public RoaringBitmap whereGreaterThan(int code, RoaringBitmap existence) {
RoaringBitmap result = existence.clone();
result.andNot(whereLessThanOrEqual(code, existence));
return result;
}

@Override
public RoaringBitmap whereGreaterThanOrEqual(int code, RoaringBitmap existence) {
RoaringBitmap result = existence.clone();
result.andNot(whereLessThan(code, existence));
return result;
}
}