|||

Video Transcript

X

How Heroku Operates its Multi-Tenant Apache Kafka Services

This blog post is adapted from a talk given by Ali Hamidi at Data Council SF '19 titled "Operating Multi-Tenant Kafka Services for Developers on Heroku."

Hi. Welcome to Operating Multi-Tenant Kafka Services for Developers. This is the agenda for the talk. I'm going to give you a little intro about myself and Heroku and Heroku Data. We're going to look at the motivation behind building the multi-tenant Kafka service in general, take a look at our existing single-tenant Kafka, compare that with multi-tenancy and what the multi-tenancy implications are, and then we'll go into some of the configuration changes that we made and some of the tuning that we did. Talk a little bit about testing and then hopefully cover automation, which is kind of our thing, and discuss some of the limitations that were necessary as a result of making a multi-tenant service.

I am Ali Hamidi. I am an engineer on the Heroku Data team at Salesforce. Heroku is a cloud platform that enables companies to build applications in a variety of languages, deploy it on our platform and actually it helps them to run it, scale it, monitor it. Heroku Data specifically is the team at Heroku that provides the data services on top of it. We provide Heroku Postgres, Heroku Redis and Kafka, of course.

A brief about Kafka. Hopefully many of you are aware of it already. Kafka itself is an Apache project. It's a distributed streaming platform. It provides the pub/sub model. In Kafka parlance it's produce and consume, so you produce the cluster and you consume from it. It's a durable message store so on disk it's an ordered commit log and it's highly available. You have multiple brokers that can hold multiple replicas of your data. If a single broker goes down, you still have replicas and it will failover and you continue.

What is Apache Kafka on Heroku specifically? We provide a fully managed service, so it's more of a batteries included approach. It's opinionated in the sense that in cases where there are multiple configuration options we make a decision based on what we feel is a safe or productive default and we provide that out of the box.

It's configured for best practices for most users. I put a little asterisk because there's no, we can't make one size fits all. There are bound to be use cases that aren't really well-suited, but we try to cover as many as possible and try to get the largest coverage as possible.

I want to talk a little bit about the use cases for Kafka in general. Why would you use Kafka and not some other data store or other streaming service? I'm going to cover a couple of use cases and then look at specific customers that were on Heroku that used Heroku Kafka.

One of the ones I'm going to look at is decomposing a monolithic app. In this case we're taking a big monolith and we're using Kafka as a transport medium or interconnect between the various services or microservices to transport information and events between them.

Another use case that I'm going to cover is processing high volume, real-time data streams. You may have something that is naturally a stream of events, like currency information or events coming from an IOT device or whatever it is, and you just have a torrent of data that you're collecting from various sources and you want to make real-time decisions on those.

The third use case I'm going to cover is providing real-time event-driven architecture. This is pretty similar to the second case, but essentially you have a number of disparate services that are producing events and you're trying to make decisions on some kind of aggregate of that data. You have a number of sources, they're all telling you different things and you want to make real-time decisions on what you should be doing.

For the first case, decomposing the monolith, I'm going to talk a little bit about SHIFT Commerce. You can actually read about it, they have a blog post on the Heroku blog that talks about it. They basically leveraged Heroku Kafka as a path to gradual migration from a monolith into a microservice. Essentially what they did was they introduced the ability for their monolith to produce events into Kafka and then gradually over time they introduced more and more services that consume those events and moved the logic out of the monolith into those new microservices.

For the second use case I wanted to talk a little bit about Quoine, it's pronounced coin. Quoine is a fintech company based out of Singapore and they mostly deal with cryptocurrency exchanges and mostly active in Japan. Essentially they have a huge torrent of pricing data for cryptocurrencies coming from a number of different exchanges and they aggregate them and make decisions in real-time on what to do and how the prices affect their decisions.

The last use case I want to cover is Caesars Entertainment of casino fame and other things. Essentially they use it to ingest and aggregate a large amount of customer data from lots and lots of different sources and they use it to make decisions on how best to provide services or offers or deals to their most prized VIP customers. These are very different systems, all producing events. They aggregate, they process them, they try to make decisions on how best to serve their customers.

I want to talk a little bit about the motivation, why bother building a multi-tenant Kafka. We already have a relatively successful dedicated Kafka product. These are some of the motivations behind it, some of the goals. Right now our single-tenant Kafka starts at $1,500 for the base plan, which is the standard zero. With that you get three brokers and five ZooKeepers, which is kind of pretty standard for our ZooKeeper topology.

1,500 could be relatively reasonable for you, but we want to lower that barrier, we want to make it much easier to access. If you're working in development, you haven't really finished your product yet, that might be too high a barrier for you and so targeting development, testing, low-volume production, we want to introduce something that's a little bit more accessible. Our multi-tenant plans right now start at a $100, so that's quite a substantial reduction.

Here's the blog post when we announced it. It's September 14, 2017. We've had multi-tenant Kafka for quite a while. I want to talk a little bit about the differences, single-tenant versus multi-tenant and what it looks like. Single-tenant is pretty obvious. We have one customer or one tenant that has access to the entire cluster and so your security boundaries or your resource boundaries are really outside of the cluster, everything in the cluster is yours. This is really what it looks like. When you talk Kafka you're really dealing with topics. All of the topics are owned by a single customer, you don't really need to worry about any isolation at that level.

With multi-tenancy it kind of looks like this. We have a large number of customers all residing on a single Kafka cluster. That introduces a whole range of new issues that you need to tackle. I know this is kind of a wall of text but I'm going to break it down and talk about each one separately.

We talk about resource isolation, we talk about parity, compatibility and costs related to it. I'm going to focus on resource isolation first and talk about security implications, performance implications, and safety. When I talk about security, what I really mean is a tenant should not be able to access another tenant's data. This is kind of table stakes for multi-tenancy. It would be kind of a disaster if we put everyone on the same cluster and everyone could read everyone else's data. The priority, the main thing, is this should not happen. You shouldn't be able to read other people's data. With Kafka, because we actually talk about topics all the time, this is really what we mean. User A shouldn't be able to read User B's topics.

How do we do that? Kafka itself has native support for access control lists, or ACLs. It's a somewhat relatively new feature in Kafka life, but essentially what it allows you to do is specify User A can carry out action B on resource C. User A can read or describe or write topic messages, which is owned by User A, but there is no explicit ACL for allowing it to read anything of User B so it can't.

The other thing we do is namespacing. This is something that's somewhat unique to us in that we namespace resources per tenant. When you create a new multi-tenant Kafka add-on on Heroku, we automatically generate a name, in this case wabash-58779. We pick a river, dash, some numbers and that becomes the prefix for all of your resources. You create the topic messages, you get Wabash whatever it is, dot messages. That also applies to the consumer groups and that's actually what we apply the ACLs to. Any topic that begins with wabash whatever is owned by tenant A. Anything that has a different prefix is owned by another tenant.

The second aspect that I want to look into is performance and what I mean here is really this: a tenant should not adversely affect another tenant's performance. This is typically what we refer to as the noisy neighbor. You're sharing a cluster with someone, they're super active, they're absolutely hammering their broker and your performance goes down the toilet as a result of it. Obviously that sucks for everyone, except for the person who's abusing the resources, and so we want to prevent that.

In the case of Kafka, Kafka has support for quotas. That's super cool, it's really useful. It allows you to specify a produced quota and a consumed quota in terms of bytes per second. We say Tenant A can have 256 kilobytes per second produced and 1 megabyte per second of consumed, and Kafka itself enforces that. I'll talk a little bit later about the way that's enforced because it's kind of interesting.

The third part of resource isolation is safety. Here what I actually mean is a tenant should not jeopardize the stability of the cluster. We don't want a single tenant who does something unusual or has a misbehaving client to take down the cluster for everyone else.

In order to prevent that we apply limits. We can apply limits to topics, and by that I mean the number of topics, and by association the number of partitions, which is really the thing that scales on Kafka. The number of consumer groups, storage, and throughput.

I want to talk a little bit more about storage because it's kind of interesting. Kafka's concept of storage is more of a transient thing. Messages live on the Kafka cluster for a certain amount of time but they don't live there forever. It's not a source of truth for your data forever. Thinking of it in terms of static storage is a little bit strange so instead we think of it as capacity and this is a term that we use in all of our documentation. Essentially that's a function of a message throughput multiplied by the retention on all of those messages multiplied by the replication. If I do a hundred messages per second and I have a one-day retention and I want three replicas of it and you multiply that all together and that's actually the total amount of storage you're using across the cluster. And then there's the throughput itself, which I covered with quotas.

Imposing these limits is cool. Some of them are natively supported within Kafka, so you can specify quotas or ACLs to do certain things. Storage is something that we covered with the capacity, which is a slightly more complicated function. But we need to be able to monitor it. We actually need to be able to look at this and say, are these limits being exceeded? Are they being adhered to?

In particular with storage, the way it works in our case is that we allow you to grow the storage on the cluster so use of another cluster we give you 100 gigabytes of storage. You start putting data into it and you get close to the hundred gigabytes, we actually grow it automatically for you. We don't take down your cluster because you hit your limit, we have a pretty generous grace area where we will increase the storage and let you continue. That's great for the customer, but we aren't really doing you any favors by allowing you to grow unbounded, so we actually have to monitor it. We check occasionally to see, are you still within the limits? Are you nearing a dangerous limit? Monitoring is kind of key to that, but then more so than just monitoring is actually being able to enforce the limits. Some of them are nice because they're natively supported in Kafka, but in the case of storage, if our automation automatically grows the volumes then you can just keep growing forever and eventually the cluster will topple over and so we actually have to have a mechanism for enforcing it.

In the case of cast Kafka multi-tenant, we'll send you an email saying, hey you've exceeded your storage by 150%, you should probably slow it down or upgrade your plan. If you keep going past it then we impose an additional, more aggressive throttling to slow your growth and get you back under the limit. If you just fly past it for an extended period of time we actually block access to try to let it clear out and then we'll enable it again.

Another area of multi-tenancy which is specific to multi-tenancy is parity. Here we're talking about feature parity as well as behavior parity. I say parity, but what I really mean is for the service to be useful it needs to behave like a normal cluster. There's no point in us giving you something that looks and behaves in a way that's totally unlike Kafka because you could code against it and you could deploy your application against it and it works fine, but then you switch to regular Kafka and everything breaks and it just doesn't do what you think it does.

In order to address this, we actually give you access to what is effectively a standard Kafka cluster. It looks very much like our dedicated clusters, it's the same number of brokers, it's the same number of ZooKeeper nodes, it behaves in exactly the same way.

One of the things that we thought about is that we could have gone with a single broker, single ZooKeeper option and that would have been pretty cost-effective and we could do it quite easily, but then the customer would be developing against something that's not really representative of real life. Nobody would deploy a production Kafka cluster with one broker and one ZooKeeper and so you won't have exposure to things like rolling restarts or when topics failover and all that stuff.

Your application might work fine with only one, but then once you switch to a real cluster which has eight brokers and they start failing over and things start moving around, your application breaks. We actually give you access to something that represents what a real production cluster would look like, so if you do move then your application is ready for it. There are some limitations. We have to make some compromises in order to provide the level of safety that I mentioned earlier. As much as possible we try to keep it standard, but there's going to be some things that we have to change.

Compatibility is the next thing I want to talk about. In this case what I really mean is the service needs to support standard clients, no vendor lock-in. That's absolutely a core part of Heroku's belief and we only use opensource Apache Kafka. It's not a custom fork, there's no weird custom code required on the client side, there's nothing strange that we do in order to enable this. We use absolutely standard clients and whatever your favorite language is and you can connect to it and use it. That's kind of a big thing for us and it applies to all of our data services really.

The last implication I want to talk about is costs, and specifically costs of resources and operational costs. What does it cost to run for the things that we run, like VMs and instances. Plus, what does it cost us to actually operate it? Here we're talking about the service needs to be financially feasible and in this case I'm actually talking about our costs, not so much the cost for the customer, although they're obviously related.

Here, the things that go into the resource costs would be packing density. How many tenants do we pack on this cluster? Conversely, but somewhat related, is the utilization of that cluster. How well-utilized is it? The more utilized it is the more efficiently we're spending our money on these resources. But with that comes a few sort of interesting cases where we have consciously made a decision to take a hit on our part in order to provide certain functionality.

When we look at cluster size, we have some options. We could have one massive cluster with hundreds of brokers and pack all of the tenants onto there, or we could have very, very small clusters like one broker, one ZooKeeper. There are some decisions there where there are implications related to the way it performs, the way failovers work. There is a sweet spot for cluster size and we have actually already done this process. We've looked into this, we did all the testing when we looked at the single dedicated plans and we settled on a maximum of eight brokers and five ZooKeepers because of the way it behaves and so we kind of favor that.

We also insisted pretty early on that we don't over-provision. When we tell a customer you're signing up for a particular plan, we make sure that they have all those resources and so we don't over provision in any way. Again, if we say we're going to pack 20 customers onto a cluster and they all join on with the lowest plan, then our utilization is pretty low. That sucks from a resource cost but it's great from the customer point of view because now they have the resources that they expected.

Also we wanted to provide seamless upgrading. The idea is a customer can join on with the lowest plan and they should be able to upgrade to the next plan up or the biggest plan that we have for multi-tenant without any disruption. Unfortunately when it comes to Kafka there is no easy way to actually migrate offsets. If you move from one cluster to the other, the offsets aren't really translatable in that sense. In order to get around this, we have to assume the worst case or the best case, depending on your perspective, that every customer could potentially upgrade to the biggest plan and so inherently our utilization is pretty low.

When it comes to operational costs, this is kind of an area that we want to focus on because this is the amount of man-hours that we sink into it. We're not a huge team so this kind of makes a big difference for us. We want to minimize the operational burden and minimize the impact or the blast radius of particular issues. Again, going back to the cluster size, if we had one massive cluster with a hundred brokers and there was a cluster-wide issue, then that could potentially affect hundreds or thousands of customers. Again, we fall back into the sweet spot of saying, if we have clusters that are eight brokers and we pack 25, 30 tenants onto it, then we can kind of minimize the blast radius into a particular area.

Minimizing operational costs, again, if we match the single tenant clusters in the topology, then we can actually share the learnings between them. All of the gained knowledge and experience from operating a fleet of three- and eight-broker clusters, we actually map immediately and so we find a bug in one topology or one behavior, one performance tuning that works on one, we can immediately map it over to the multi-tenant without really any loss.

Some of the other ways that we kind of reduced the operational burden for us is that we picked safe defaults. It's possible to put yourself in an unfortunate situation by picking settings that are not ideal. If you think about the situation where you create a topic and you set replication to one, which was the default in lots of clusters. The customer creates a topic and they start writing things to it and we failover that broker, well what happened to the data? It only resided on one and so now it's gone forever.

That sucks for the customer and maybe they weren't aware of it or maybe they assumed that a sane default was used, so we actually enforce that. We picked those safe defaults both to provide a better user experience but also to reduce the operational burden for us.

As I mentioned, we pick similar clusters that are dedicated so we can share those learnings and we do a ton of automation. Automation, it's kind of our thing, that's like an epic understatement. We are a very small team and we manage millions of databases and data services. Millions, without exaggeration, that's actually what we do.

We did a ton of testing. We did a huge amount of testing on the multi-tenant topologies, different size Kafka clusters, different configurations, different releases, all sorts of stuff in order to arrive at a configuration that we think is pretty sound and pretty solid and that all feeds back into reducing operational costs for us.

I want to talk a little bit about configuration and tuning and sort of talk a little bit about some of the changes that we made, some of the configurations that we did in order to achieve what we aimed to do and what we actually have running in our multi-tenant fleet.

I'm going to cover partitions, the number of partitions, what they look like. Quotas, how those work. Topics in consumer groups specifically, how they're created and the idea of guardrails. This is something that we implement to make it better for customers to run on Heroku.

Partitions. We have tested and we set a very, very large number of partitions for Kafka. For the most part the industry average is maybe one or two thousand partitions per broker, that's kind of like the rule of thumb. We are at close to 6,000 and it's unusually high but this is kind of the trade-off that we make for being able to pack so many tenants and give them the freedom to create the number of topics that they want or they need and the number of partitions per topic that they need.

In order to support a large number partitions, we actually have to tune the max number of file descriptors and so all of our clusters run at 500,000, which again is pretty atypical, it's unusually high for brokers of this size, but again this is through our extensive testing we've arrived at this number and we know it's a safe number for these clusters. Similarly, we've increased the max memory and mmap count to 500,000. If you look at most Kafka docs they'll tell you increase it, but to what is kind of up in the air it really depends on your usage.

I want to cover quotas a little bit because this is kind of an unusual implementation. KIP-13 introduced quotas in 0.9 and something that's kind of counterintuitive, both the enforcement and the way it's implemented in the first place, is that quotas are actually implemented per broker and so if you think about this, if I set a one megabyte per second produce quota on Kafka using the CLI commands, what I'm actually getting is 1 megabyte per broker. If I think, oh I've given my customer 8 megabytes and then I double the size of my cluster, I've actually given them now 16 megabytes. It's not really calculated across the cluster and if you look at the documentation they basically say it's easier to implement it this way. The quotas documented are actually the cluster-wide aggregate.

The enforcement for produced quotas is super counter-intuitive and actually called out a few times, but the way it works is that when you produce to a Kafka it sends you back a response saying, okay I got it, it's been written. When the quota is in effect and you've exceeded your quota, it actually does all the work. It accepts your message, it writes it to the commit log, it sends it off to all the brokers that are replicating, but then it just waits to respond to you.

The idea is this introduces backwards compatibility. Even if your client doesn't know anything about quotas it should, if it's a well-behaved client, wait for the ack before it tries to produce more. In reality, many Kafka clients are not well-written or don't really adhere to this behavior and they'll just keep throwing more data at it and the Kafka broker will happily accept it, so it will just keep accepting as much data as you send it and just won't respond to you, but if your client doesn't care it will just keep continuing and so you end up with this case where it's sort of like the honor-based implementation of quotas. It's like you really shouldn't me anything and then you just do and it just accepts it. This actually led to another issue that we uncovered, but I'll cover that in a little bit. So yeah, if you are using Kafka and you're already using produced quotas, that's something to keep in mind.

With topics in consumer groups, this is kind of an unfortunate side effect of unfortunate necessity in the way we implemented multi-tenancy and that there is no way in Kafka, even with ACLs, to specify a numeric limit to the number of topics. We can't say User A can create 40 topics and 200 partitions, the ACLs don't have that kind of granularity. In order for us to impose those limits, instead we actually split the control plane from Kafka and Heroku.

The idea is we removed the ability to create topics dynamically through the admin APIs and instead we forced the customer to go through the Heroku API in order to create these topics. That gives us the gateway, the entry point, in order to enforce this limits. For the most part that's not really an issue. Most of our users wish that they could dynamically create topics in consumer groups, but in reality it's not really a blocker in most cases.

The last sort of major topic in configuration that I want to cover is guardrails. The idea here is to limit potential bad usage. Customers may use clients that are poorly written because their language doesn't have a great supported client or they may mis-configure the client in a way that introduces bad behavior that could cause instability or other performance issues.

There's this quote which I've heard many times in Heroku, but I'm actually not sure who said it initially, but essentially it's, "Customers don't make mistakes, we make bad tools." The idea is if we give the opportunity for a customer to make a mistake and behave badly then it's actually on us. We shouldn't have let him do that. Wherever possible we try to build in these guardrails. We try to put things in place to prevent customers from making mistakes, essentially.

Some of the configurations that we have are on the Heroku data plane side that we actually enforce is minimum retention time. This is something that customers often ask about because it seems kind of weird. Why are you telling me that I should keep data around for at least this long? It seems kind of like a bizarre thing for us to enforce, but through our testing, and if you look through the mailing lists, anything less introduces a lot of turn on the broker and so you're basically wasting cycles, wasting IOPs, throwing out data that you could be doing something productive with. We found that 24 hours is kind of The sweet spot. Most customers ask us why is that the minimum, but it's almost never an issue.

We have a max retention time for our multi-tenant plan of seven days. We have a default replication factor of three, a minimum replication factor of three and a maximum replication factor of three. This should tell you that really you should have a replication of three, we don't allow anything else. This is one of the more significant guardrails that we have in place. Some of the defaults in Kafka would not necessarily have three, you would create a topic and it just gives you one replication and weird things start happening, and so we strictly enforce three.

Within Kafka itself we've also changed the defaults, again to make them a little bit more sane, more safe. The default number of partitions we set is eight, the default for Kafka itself is one. With one you aren't gaining any of the benefits of scale, that will only reside on a single broker and your performance will be basically pinned to a single broker's worth of throughput. If you remember what I mentioned about quotas, essentially quotas are enforced per broker so if you create a one-partition topic and your quota is 128K, the maximum throughput you'll ever get through it is 128K because it's all going through one broker. We default to the same number of brokers in the cluster. Again, replication of three, we're pretty serious about it. If you missed it the first time, we're going to get it the second time. It's going to be three.

Min in-sync replicas, if you're not familiar with how this works is essentially on the Kafka broker you can specify the number of confirmed writes from replicas before you write back to the client and you say, "I have received this message." Previous releases of Kafka had it set to one, or I think it may have been zero actually, so just the receiving broker says, "I have got this message," and then the client would think, "Great, I've written it, I should continue with my life." And then that broker dies and you've lost all your messages, and so it's kind of confusing and counterintuitive. Our default is set to two and most recent versions of Kafka actually have it to two as well.

A little bit about monitoring at Heroku. We use a custom monitoring tool, it's called Observatory, it's pretty awesome. It's agentless in the sense that one of the core beliefs of Heroku Data and for its monitoring is that all monitoring should be done from the outside in. We don't actually deploy anything in the brokers or on the containers or on the services themselves. What that gives us is this amazing power where through support or through on-call we discover, oh actually there's this thing, there's this metric that we should be monitoring and we aren't. We basically teach our monitoring tool how to look at that thing and now that metric is being exposed across every single instance in our fleet. We pushed some code and now a million databases are now telling us something new. It's pretty spectacular, it's one of the best things that we've come up with, I think.

JMX is pretty great. JMX is purely a Java thing, which is not great, but Kafka itself exposes a ton of metrics about every aspect, everything you could possibly want from your Kafka broker, JMX can tell you, but it's Java only, which sucks. Jolokia is a HTTP proxy for JMX and so when we deploy Kafka we deploy Jolokia with it and that basically allows us to reap the benefits of JMX without having to run Java on our control plane itself, or the monitoring actually. That's pretty fantastic and I highly recommend it.

Testing. I mentioned we did a ton of testing. This was pretty grueling. We tested a ridiculous number of permutations on a number of partitions, maximum number of partitions. What kind of throughput should we provide for each tenant? What's the maximum that we can get looking at different message sizes? Kafka behaves very differently if you send lots of very small messages or very large messages.

What about common operations? We have a huge number of partitions, we're hammering it with a million large messages and then we decided to take one of the brokers out. Does it survive that? There were some topologies where you're like yes, this is the one. It's perfect, everything works fine. And then someone says, "Well, let's do a rolling restart," and then it just cascades and everything dies immediately. So yeah, we actually looked through all these failure scenarios. We would swap out a broker, we replaced one, we'd restart one. One of the servers just stops, an instance falls out from underneath us, so we tested all of those.

Yeah, similarly failure scenarios. If you run on AWS things will fail; if you have a large fleet, they will fail often. We look at those cases where we decide we're running a broker and we need to attach an EBS volume, does the Kafka cluster survive that? We looked at maximum packing density, again this is kind of a function of partitions mainly. But can we fit 100 tenants? Can we fit 500 tenants? What do those look like? How does it perform? What are the implications of failing over, that kind of stuff.

Bugs. Bugs was kind of a painful one. We did encounter bugs, obviously. Kafka is somewhat immature, it has a crazy development cycle, it's really actively being developed all the time. I'll cover some of those issues. But one thing which we arrived on pretty quickly, and I mentioned automation is kind of our thing, is that we want to automate testing. We developed an internal tool that leverages the internal Heroku APIs and basically simulates users. It stands up a cluster, an empty cluster, it creates 50 users and then it creates a producer and a consumer, or many producers and consumers, for these simulated users. We can actually define profiles for each user and say, we want 10% of our users to send very small amounts of traffic. We want 20% of them to send very large messages but at a very slow speed, and then we want the rest of them to do some random distribution of data.

We can actually script this and it's reproducible. We release a new version, we run a new version on Kafka, we maybe tune the OS a little bit and we say, All right, go ahead and hammer this cluster." It will provision all the users, it will push all this data through, and then monitor it and look through the logs and say, okay this worked pretty well or we noticed that when we did a rolling restart at this point it struggled, it took too long to come up. So yeah, we automate everything, including our testing in this case. Yeah, so that was pretty cool.

Bugs. We found a bug while testing Kafka multi-tenant. Specifically it affects the way the quota is enforced. It's KAFKA-4725, you can look it up. We discovered this while testing. We noticed that we would run a test and in this particular set of tests, our test was, what happens if we set quotas to one megabyte and then customers completely ignore it and they start writing at ten megabytes, what happens? Quotas looked like they were being enforced and then suddenly one of the brokers would go down and then all the other workers would go down and we were confused. We were like, what is going on? Why is this happening?

We dug into it a little bit deeper and we found that there was a memory leak. The way quotas are enforced is, as I mentioned, it's delayed by ... The way they slow you down is by holding onto the response, so they don't ack your response until it's ready to do so, which would meet your quotas but essentially what that meant was they were holding onto a reference of that message, so you kept sending me data and I just keep holding on to more and more references through these messages that you sent and so memory would go through the roof, JVM would sort of kill it because you ran out of memory and the broker would go down.

And then all of your traffic would shift to the other brokers and they would do the same thing and so you'd get this cascading failure pretty quickly. We found that. We contributed a PR and it was accepted and fixed in 0.10.1.1. Yeah, we're pretty active in testing new releases for Kafka, that's something we do and we always provide feedback. On the mailing list you may have seen us.

Automation, it's totally our thing. Like I mentioned, we are a pretty small team and we manage millions of databases so we have to automate. Kafka does a lot in terms of maintaining these clusters, in terms of leader election and failover and that kind of stuff, but it doesn't do everything. It has no awareness of underlying instances. As I mentioned, if you're on AWS instances will fail and if you run a fleet big enough they will fail a lot.

Our mantra, I guess, is to automate everything. We necessarily have to automate as much as possible. I semi-jokingly believe my job is to automate myself out of a job and so if I do a really, really good job then I will be useless and things will just run themselves.

What we do in terms of automation is we have a lot of tools for cluster resizing, for replacing nodes. The automation itself looks at the cluster and says, my plan says I should have eight brokers, so if I don't have eight I should resize to get eight. We can literally go into the AWS console out-of-band and start deleting instances and our automation will go, oh there's something not right, let me bring up another node, and it will start filling them out.

Storage expansion. Something that I mentioned earlier, as you write more data to Kafka, we detect, oh you're at 80% of your storage, let's expand that. We'll keep expanding it as much as you need essentially until some other part of monitoring says, hang on you've exceeded your limit by too much so we're going to do something about that. We actually automate version upgrades, so although you as a customer on a multi-tenant plan can't upgrade yourself, we automate the upgrades for the entire cluster, so it's not possible for us to manually go in and start messing around with hundreds of clusters or thousands of clusters, so we basically say, okay, now we're going to upgrade all of the Kafkas to 2.1 and the automation does it.

Restarts. I put a star because it is very surprising how effective a restart is. There is the joke of, have you tried turning off and on again? That works really, really well. Actually 90% of our pages are handled automatically by automation and the vast majority of that automation is to restart a thing. It could be restart the service, it could be restart the instance. But yeah, turns out turning it off and on again tends to work really, really well, disproportionately well.

There is always more stuff that we can automate. We really, really automate everything. Our team is really, really small so yeah, we automate as much as possible. My colleague, Tom Crayford, has a talk about running hundreds of Kafka clusters with a team of five. I highly recommend to you to take a look at it, he goes into much more detail about our automation.

I'm almost done, but some of the limitations. These are the unfortunate side effects of being able to provide what we do. Yeah, we don't allow the admin API, we force explicit topic creation, explicit customer group creation, those are key for access.

Thank you very much. I will be taking office hours somewhere nearby and if you want to talk to me more, please do so. Thank you very much.

Audience Question: When you're saying you do a lot testing, do you test with enable the encryption on the fly, so basically is that from client to broker you encrypt the data? I guess how much performance hit you see when you do the encryption.

By default all of our multi-tenant plans require SSL encryption. There's no way for you not to use it on multi-tenant. We do see a performance hit, but typically it's like a CPU performance hit and almost 100% of our clusters are IO bound and so practical implications are pretty small. The CPU doesn't run hot, but you're testing the limits of IO on the volumes.

Audience Question: Do you set up any schema registry to validate those produced input data?

Unfortunately schema registry is a Confluent product and so we are not allowed to run a managed schema registry for you, but you can deploy it yourself.

Thousands of developers use Heroku’s Apache Kafka service to process millions of transactions on our platform—and many of them do so through our multi-tenant Kafka service. Operating Kafka clusters at this scale requires careful planning to ensure capacity and uptime across a wide range of customer use cases. With significant automation and test suites, we’re able to do this without a massive operations team.

In this post, we're going to talk about how we've architected our infrastructure to be secure, reliable, and efficient for your needs, even as your events are processed in a multi-tenant environment.

What is Kafka used for?

Kafka is a distributed streaming platform, with "producers" generating messages and "consumers" reading those messages. The message store is written to disk and highly available. Instances that run Kafka are called brokers, and multiple brokers can serve as replicas for your data. If a single broker fails, the replicas are promoted, enabling continued operations with no downtime. Each Kafka cluster comes with a fully-managed ZooKeeper cluster that handles the configuration and synchronization of these different services, including access rights, health checks, and partition management.

Kafka is the key enabling technology in a number of data-heavy use cases. Some customers use Kafka to ingest a large amount of data from disparate sources. Some use Kafka to build event-driven architectures to process, aggregate, and act on data in real-time. And others use Kafka to migrate from a monolith to microservices, where it functions as an intermediary to process events while extracting out embedded functionality into smaller services during the migration process. (You can read all about how SHIFT Commerce did this on our blog.)

How Heroku runs Kafka

Heroku's least expensive single-tenant plan comes with three brokers. Everything within this cluster belongs to you, and it's a good option if performance and isolation are critical to your application.

The trade-off, of course, is that these plans tend to be about fifteen times more expensive than our least expensive multi-tenant plan. The multi-tenant plans are better suited for development and testing environments, or even for applications that simply don't need the immense power of a dedicated cluster.

Our multi-tenant plans have feature and operational parity that match our single-tenant offerings: they’re the same number of brokers, the same number of ZooKeeper nodes, and they behave in exactly the same way. While it would have been more cost-effective for us to have provided just a single broker/single ZooKeeper option, we felt that our users would then be developing against something that's not really representative of real life. Instead, we give you access to something that represents what a real production cluster would look like at a fraction of the cost. For example, if your code only acts against a single node, you have no opportunity to anticipate and build for common failure scenarios. When you’re ready to upgrade from a multi-tenant Kafka service to a single-tenant setup, your application is already prepared for it.

We're also committed to ensuring that our multi-tenant options are still secure and performant no matter how many customers are sharing a single cluster. The rest of this post goes into more details on how we've accomplished that.

Security through isolation

Even though many different customers can be situated on a single multi-tenant Kafka cluster, our top priority is to ensure that no one can access anyone else's data. We do this primarily in two ways.

First, Kafka has support for access control lists (ACLs), and it is enforced at the cluster-level. This allows us to specify which users can perform which actions on which topics. Second, we also namespace our resources on a per-tenant level. When you create a new Kafka add-on, we automatically generate a name and associate it with your account, like wabash-58799. Thereafter, any activity on that resource is only accessible by your application. This guarantees another level of security that is essentially unique to Heroku.

A single tenant should also not disturb any of the other tenants on a cluster, primarily when it comes to performance. Your usage of Kafka should not degrade if another Heroku user is processing an immense number of events. To mitigate this, Kafka supports quotas for producers/consumers, and we enforce the number of bytes per second a user can write or read. This way, no matter how many events are available to act upon, every user on the cluster is given their fair share of computing resources to respond.

Keeping Kafka available

When you buy Kafka from Heroku, we immediately provision the full set of resources that you purchased; we never take shortcuts or overprovision workloads. When it comes to storage, for example, we will automatically expand the amount of disk space you have available to use. If you reach 80% of your quota, we will expand that hard drive so that you can continue to write data to Kafka. We'll keep expanding it (while sending you email reminders that you've exceeded your capacity) so as to not interrupt your workflow. If you go far above your limit without addressing the problem, we'll throttle your throughput, but still keep the cluster operational for you to use.

In many cases, for our sake and yours, we set configuration defaults that are sane and safe, and often times even higher than what Kafka initially recommends. Some of these include higher partition settings (from one to eight, to maximize throughput), additional replicas (from one to three, to ensure your data is not lost), and more in-sync replicas (from one to two, to truly confirm that a replica received a write).

Applying our tests to production

We tested our Kafka infrastructure across a variety of permutations in order to simulate how the multi-tenant clusters would behave in the real world.

In order to find the most optimal configurations and observe how Kafka's performance changed, we tried multiple combinations, from throughput limits to the maximum number of partitions.

Similarly, we replicated several failure scenarios, to see whether the service could survive unexpected changes. We would hammer a cluster with a million messages, and then take one of the brokers offline. Or, we'd operate a cluster normally, and then decide to stop and restart the server, to verify whether the failover process works.

Our tests create an empty cluster, and then generate 50 users that attach the Kafka add-on to their application; they then create many producers and consumers for these simulated users. From there, we define usage profiles for each user. For example, we'll say that 10% of our users send very small amounts of traffic, and 20% of them send very large messages but at a very slow speed, and then the rest of them do some random distribution of data. Through this process, and while gradually increasing the number of users, we’re able to determine the multi-tenant cluster’s operational limits.

From these incidents and observations, we were able to identify issues in our setups and adjusted assumptions to truly make the infrastructure resilient, before they became problems for our users. For a deeper look into how we’ve set up all this testing and automation, my colleague Tom Crayford has given a talk called “Running Hundreds of Kafka Clusters with 5 People.”

More information

If you'd like to learn more about how we operate Kafka, or what our customers have built with this service, check out our previous blog posts. Or, you can watch this demo for a more technical examination on what Apache Kafka on Heroku can do for you.

Originally published: July 11, 2019

Browse the archives for engineering or all blogs Subscribe to the RSS feed for engineering or all blogs.