Phi Fi Foe Fun, I smell the blood of a dead Cassandra node

One thing Cassandra does fairly well is direct traffic to nodes that are less busy or have better throughput. It will even mark nodes as “down” if they aren’t reporting gossip as often as they should.

The latter part is done by the FailureDetector. The FailureDetector is stateless, whereas the gossiper has a set of states you might already be familiar with. The gossiper uses the FailureDetector as a way of knowing that a node just isn’t reporting in often enough.

There are two primary methods on the IFailureDetector. The first one is report, which is what the gossiper calls every time a message is received from that node. A message is supposed to arrive every second from the Gossiper. The failure detector will record the time between messages for the last 1000 messages, which should be about 16 minutes, 40 seconds of history. It will not record the time if the last message was sent more than 2 seconds ago.

With this information at hand, the gossiper does a status check every second as well (usually). It then calculates this magical “phi” value (pronounced “fee”) which is the time since the last message divided by the average time of the collected samples. It then divides by a constant, PHI_FACTOR, which is about 0.434. If that is above phi_convict_threshold, then the failure detector considers the node “down”.

The average collected samples must average less than 2, and rarely falls below 1, since items above 2 are discarded and we only send messages every second. A message delayed just over 2 seconds might be right behind another that was sent less than a second ago, and we’ll only record the second sample, which can cause the number to drop lower than 1. I doubt this was the intended behavior, but it’s how it works and can be seen in testing. For most networks, even with some jitter, you should see an average value of about 1.

The upshot is that your phi value says how long it’s been since the last sample. If that’s (by default) about 18.4 seconds, your node is down. That means in the default configuration, a node is considered down if you don’t get a gossip ping in about 18.4 seconds, if you usually get one every second. The 18.4 second value came from the default phi_convict_threshold of 8, divided by PHI_FACTOR of 0.434.

There’s one more thing to consider, though, and that is that if the FailureDetector decides that a node should be “convicted”, it calls a callback, but the callback can choose to do nothing. That happens in two places in the code. If you’re streaming, or running a repair, the node is still up unless your phi value is 100 times the convict threshold, or 800 with a default phi_convict_factor. That math is done without any adjustment by PHI_FACTOR, and the 100 is not tunable. This means that, by default, you have about 13 minutes, 20 seconds before a node is considered down.

This, of course, is adjustable. The documentation says to increase phi_convict_factor when you’re on AWS to 12. If you set it this high, you instead have about 1200 seconds where gossip doesn’t need to arrive, or about 20 minutes, for repairs or streaming.

Monitoring Phi

Unfortunately, there are no good ways of monitoring phi, unless you exceed it, in which case the node is marked down. I wanted some way to know if we’re close, so I added some code in CASSANDRA-9526 to expose it via JMX, and added some code in the latest version of the Cassandra Statsd Agent to send that out to our DataDog system for monitoring. — - Ron Kuris

posted in: · · ·

Measuring Kafka consumer delay with Verspätung

Sometime in mid-2014, we started to roll out Apache Kafka as a replacement to our older ActiveMQ messaging infrastructure. While we were evaluating and deploying it, we became enamoured with its robustness and clean architectural design. Many of the problems with our previous infrastructure would no longer exist in the brave new Kafka-based world.

Note: if you are unfamiliar with Kafka’s design, our very own Brandon Burton wrote up a good intro to Kafka for last year’s SysAdvent.

One key principle with Kafka’s design, is that consumers store an offset into the commit log for a topic/partition. This means that for a single topic (e.g. device_events) I can easily have hundreds, if not thousands, of consumers with no additional storage requirements on the Kafka cluster, since each new consumer only needs to store a single integer offset.

A drawback of this approach is that understanding how “behind” a single consumer group might be from the “latest” for a Kafka topic can be difficult to discover. Most consumers use Zookeeper by default for storing their offsets, while Kafka topics store their “latest offset” inside of Kafka itself.

Unable to find a suitable solution to track and report on this information, I embarked on writing a new daemon to help: Verspätung (the German word for delay or lateness)

Meet Verspätung

Verspätung is a Groovy based daemon which implements a few key behaviors:

  • Subscribes to Zookeeper subtrees for updates
  • Periodically polls Kafka brokers using the Kafka meta-data protocol
  • Exposes offset deltas to be consumed by metrics systems (e.g. Datadog.

Watching Zookeeper

Argubly one of the most important features of Verspätung is its ability to “watch” an entire Zookeeper subtree for updates. This means that Verspätung doesn’t need an ahead-of-time knowledge of what consumers might be coming or going. When nodes are added to the Zookeeper subtree, Verspätung will receive an event and register a metric for that consumer.

Implementing this turned out to be far easier than I had anticipated thanks to the wonderful Curator library, and its TreeCache recipe. The implementation for this logic can be found in the “TreeWatcher” files in the source tree.

This information only gives us a part of the puzzle, the currently committed offset for a consumer group, we still need to get the latest from Kafka.

Polling Kafka

The best way to get data from Kafka, is to speak Kafka! Therefore Verspätung also includes a dependency on the Kafka client library.

After studing the code behind Kafka’s GetOffsetShell I implemented Verspätung’s KafkaPoller whose sole purpose in life is to check in with Kafka and get updated metadata every now and again.

I wasn’t able to find any means of getting an evented stream of this information, so the KafkaPoller will ask Kafka every second (currently hard-coded) for the latest offsets on all topics and partitions.

Regardless of whether we have active consumers for these topics or not, all this data is recorded internally to Verspätung.

With the second piece of the puzzle in place, we finally need to start reporting the deltas!

Publishing Metrics

Verspätung uses another fantastic library, dropwizard-metrics, for publishing metrics on Kafka consumer delays.

When a new consumer is discovered in Zookeeper, Verspätung will register a new gauge to its internal MetricsRegistry. The gauge in term will consult our internal data structures for computing the delta only when asked by the reporter.

Due to this design, the retrieval of offsets from both Kafka and Zookeeper is completely decoupled from the recording of metrics. This allows for a very granular time-period for reporting metrics or a very coarse one depending on your needs.

Delay for a production consumer over 24 hours

Currently by default, Verspätung will use our forked metrics-datadog reporter which allows for dynamic per-Gauge tags.

Verspätung then reports a few different tags to Datadog, namely “consumer-group’, “partition” and “topic” which allows us to cut and slice the metrics to fit our needs in the Datadog interface.

Los geht’s!

There’s still some outstanding issues with Verspätung but we’ve been running v0.2.0 in pre-production and production environments without issues for over a month now.

We hope you find the tool useful!

posted in: · · ·

Lookout at FOSDEM 2015

Every year, thousands of open source hackers and enthusiasts flock to Brussels in Belgium for FOSDEM, the Free and Open Source Developers European Meeting. In short, FOSDEM is a place for thousands of self-organizing and passionate open source hackers to share code, tools and ideas while ingesting thousands of liters of Club-Mate and beer.

For the past few years I have personally been involved in organizing the “Testing and Automation” devroom, which will be on the schedule again this year. Never satisfied with a relaxing day at FOSDEM, this year I’m also helping to organize the first Ruby devroom at FOSDEM (disclaimer: there was a Ruby + Rails devroom from 2008-2010).

As Lookout is getting more involved in JRuby, I’m happy to say that both Christian Meier and myself will be in attendance for the entirety of the event.

I’m personally very excited that Lookout is going to be present for the second consecutive year at FOSDEM and that the conference continues to have an incredible amount of great technical content.

If you’re going to be at FOSDEM and are interested in meeting up and talking about Lookout, Ruby, Test/Automation or how stinkin’ cold it is outside, ping me or @LookoutEng on Twitter.

Hope to see you there!

posted in: · · ·