TCP Congestion Control and Spark Back Pressure

Bandwidth in the Internet is a rivalrous resource; consumption by any consumer grants utility and inhibits consumption by other consumers. Unless consumers sometimes relent in their pursuit of bandwidth, buffers on routers would spend most of their time discarding of excess packets making point-to-point connections unsustainable. Stopping the Internet from collapsing requires a distributed algorithm along the lines of the old adage “do as you would be done by”. Such algorithms have existed for decades, are well understood, and have some interesting properties that can be applied to throughput problems in application development.

Congestion Control

In 1986 the throughput on the campus network at UC Berkeley, consisting of three links, fell from 32Kbps to 40bps without explanation. Van Jacobson realised that this was caused by buffer overload at routers, leading to an excessive fraction of network packets being dropped, preventing establishment and sustenance of TCP connections. In order to optimise aggregate bandwidth, utilisation should be controlled at a level where packet loss at routers is minimised. One solution to the problem would be to grant all bandwidth to a single consumer – with obvious drawbacks – any allocation should be fair and stable.  A centralised mediator, such as a government, is computationally infeasible so the algorithm must be decentralised.

Jacobson’s Algorithm

Van Jacobson devised an algorithm in TCP clients which use packet loss (manifested in corruption or timeouts) as a congestion indicator. Each client maintains a congestion window which limits the number of unacknowledged packets at any point in time. When congestion is detected, the client backs off. The algorithm splits the lifecycle of a  TCP connection into two phases: Slow Start and Congestion Avoidance.

  1. Slow Start occurs when a connection is created or after a packet times out:
    1. Initialise the congestion window (cwnd) to a number of packets called the maximum segment size (MSS).
    2. For each acknowledged packet, increase cwnd by one until the slow start threshold (ssthresh) is reached or a packet is lost. This is more or less exponential increase (and favours short connections).
    3. When a loss event occurs, set ssthresh to half its current value and resume slow start
  2. Congestion avoidance starts when the slow start threshold is reached.
    1. Every time cwnd packets are acknowledged, add the MSS to cwnd. This is linear increase.
    2. When a loss event occurs, set ssthresh to half its current value, halve cwnd and resume linear increase.

The algorithm aims to spend as much time as possible in congestion avoidance to maintain stability, and aims to find a fair value of the slow start threshold with respect to all consumers. The time series of cwnd during a TCP connection makes a saw tooth after an initial exponential increase.

This algorithm prevents congestions collapse and can be shown to be stable at an aggregate level. However there are some issues with it. For instance, the algorithm favours short connections (those with short round trip times) both during slow start and congestion avoidance. Given the choice of a short oversubscribed route and a long undersubscribed route, the algorithm will choose the short oversubscribed route. Particularly, the linear increase during congestion avoidance is too slow and means that too much time is spent away from the optimal bandwidth.

Since Jacobson’s algorithm was devised, many variations on its theme have been proposed and some implemented, all aiming to maximise the time spent at a pareto-optimal per connection bandwidth. For instance, the BIC (Binary Increase Congestion) algorithm, which replaces the linear CA phase with a binary search. CUBIC, which replaces the linear phase with a cubic function (the inflection point set at the level of the cwnd at the last congestion), event was the default algorithm in the Linux kernel from 2.6.19. CUBIC was replaced as the default by Proportional Rate Reduction, devised by Google, which does not wait for the acknowledgement of outstanding packet before retransmission, from 3.2 onwards.

400px-bic_cubic_growth_functions

Spark Back-Pressure

When Spark back pressure is enabled and a queue of micro-batches builds up, Spark will automatically resize the batch to make it smaller. The upstream system can either block or hold onto the unconsumed messages for a bit longer. When the backlog has cleared, Spark starts increasing the batch size. I haven’t yet figured out whether Spark is explicitly optimising for minimal queue length, minimal queued bytes, maximal throughput or minimal latency but the act of varying the batch size is a search for an optimum.

Though the feature works well, decreases are often more aggressive than necessary and increases slower than feasible. Sometimes queues build up for intrinsic reasons; the job is too slow to service the batch size at the desired polling frequency, but sometimes the build up is caused extrinsically, for instance by temporary cluster overload. If the back-pressure is too aggressive and the cause extrinsic, then unless the algorithm aggressively probes recovery rate will be delayed.  It would be interesting to vary the batch sizing algorithm along the lines of variations on Jacobson’s algorithm, like replacing linear increase with binary recovery/probing, and assess the optimality (choose one or many of maximise throughput/minimise latency/minimise risk of OOM) and stability (for what proportion of time does the batch size stay at the optimal level?)

Concise Binary Object Representation

Concise Binary Object Representation (CBOR) defined by RFC 7049 is a binary, typed, self describing serialisation format. In contrast with JSON, it is binary and distinguishes between different sizes of primitive type properly. In contrast with Avro and Protobuf, it is self describing and can be used without a schema. It goes without saying for all binary formats: in cases where data is overwhelmingly numeric, both parsing time and storage size are far superior to JSON. For textual data, payloads are also typically smaller with CBOR.

The Type Byte

The first byte of every value denotes a type. The most significant three bits denote the major type (for instance byte array, unsigned integer). The last five bits of the first byte denote a minor type (float32, int64 and so on.) This is useful for type inference and validation. For instance, if you wanted to save a BLOB into HBase and map that BLOB to a spark SQL Row, you can map the first byte of each field value to a Spark DataType. If you adopt a schema on read approach, you can validate the supplied schema against the type encoding in the CBOR encoded blobs. The major types and some interesting minor types are enumerated below but see the definitions for more information.

  • 0:  unsigned integers
  • 1:  negative integers
  • 2:  byte strings, terminated by 7_31
  • 3:  UTF-8 text, terminated by 7_31
  • 4:  arrays, terminated by 7_31
  • 5:  maps, terminated by 7_31
  • 6:  tags, (0: timestamp strings, 1: unix epoch longs, 2: big integers…)
  • 7:  floating-point numbers, simple ubiquitous values (20: False, 21: True, 22: Null, 23: Undefined, 26: float, 27: double, 31: stop byte for indefinite length fields (maps, arrays etc.))

Usage

In Java, CBOR is supported by Jackson and can be used as if it is JSON. It is available in

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-cbor</artifactId>
    <version>2.8.4</version>
</dependency>

Wherever you would use an ObjectMapper to work with JSON, just use an ObjectMapper with a CBORFactory instead of the default JSONFactory.

ObjectMapper mapper = new ObjectMapper(new CBORFactory());

Jackson integrates CBOR into JAX-RS seamlessly via

<dependency>
    <groupId>com.fasterxml.jackson.jaxrs</groupId>
    <artifactId>jackson-jaxrs-cbor-provider</artifactId>
    <version>2.8.4</version>
</dependency>

If a JacksonCBORProvider is registered in a Jersey ResourceConfig (a one-liner), then any resource method annotated as @Produces("application/cbor"), or any HTTP request with the Accept header set to “application/cbor” will automatically serialise the response as CBOR.

Jackson deviates from the specification slightly by promoting floats to doubles (despite parsing floats properly it post-processes them as doubles), Jackson recognises floats properly as of 2.8.6 and distinguishes between longs and ints correctly so long as CBORGenerator.Feature.WRITE_MINIMAL_INTS is disabled on the writer.

In javascript, cbor.js can be used to deserialise CBOR, though loss of browser native support for parsing is a concern. It would be interesting to see some benchmarks for typical workloads to evaluate the balance of the cost of javascript parsing versus the benefits of reduced server side cost of generation and reduced message size. Again, for large quantities of numeric data this is more likely to be worthwhile than with text.

Comparison with JSON – Message Size

Textual data is slightly smaller when represented as CBOR as opposed to JSON. Given the interoperability that comes with JSON, it is unlikely to be worth using CBOR over JSON for reduced message size.

Large arrays of doubles are a lot smaller in CBOR. Interestingly, large arrays of small integers may actually be smaller as text than as binary; it takes only two bytes to represent 10 as text, whereas it takes four bytes in binary. Outside of the range of -99 to 999 this is no longer true, but might be a worthwhile economy for large quantities of survey results.

JSON and CBOR message sizes for messages containing mostly textual, mostly integral and mostly floating point data are benchmarked for message size at github. The output is as follows:

CBOR, Integers: size=15122B
JSON, Integers: size=6132B
CBOR, Doubles: size=27122B
JSON, Doubles: size=54621B
CBOR, Text: size=88229B
JSON, Text: size=116565B

Comparison with JSON – Read/Write Performance

Using Jackson to benchmark the size of the messages is not really a concern since it implements each specification; the output and therefore size should have been the same no matter which library produced the messages. Measuring read/write performance of a specification is difficult because only the implementation can be measured. It may well be the case that either JSON or CBOR can be read and written faster by another implementation than Jackson (though I expect Jackson is probably the fastest for either format). In any case, measuring Jackson CBOR against Jackson JSON seems fair. I benchmarked JSON vs CBOR writes using the Jackson implementations of each format and JMH. The code for the benchmark is at github

The results are as below. CBOR has significantly higher throughput for both read and write.

Benchmark Mode Count Score Error Units
readDoubleDataCBOR thrpt 5 12.230 ±1.490 ops/ms
readDoubleDataJSON thrpt 5 0.913 ±0.046 ops/ms
readIntDataCBOR thrpt 5 16.033 ±3.185 ops/ms
readIntDataJSON thrpt 5 8.400 ±1.219 ops/ms
readTextDataCBOR thrpt 5 15.736 ±3.729 ops/ms
readTextDataJSON thrpt 5 1.065 ±0.026 ops/ms
writeDoubleDataCBOR thrpt 5 26.222 ±0.779 ops/ms
writeDoubleDataJSON thrpt 5 0.930 ±0.022 ops/ms
writeIntDataCBOR thrpt 5 31.095 ±2.116 ops/ms
writeIntDataJSON thrpt 5 33.512 ±9.088 ops/ms
writeTextDataCBOR thrpt 5 31.338 ±4.519 ops/ms
writeTextDataJSON thrpt 5 1.509 ±0.245 ops/ms
readDoubleDataCBOR avgt 5 0.078 ±0.003 ms/op
readDoubleDataJSON avgt 5 1.123 ±0.108 ms/op
readIntDataCBOR avgt 5 0.062 ±0.008 ms/op
readIntDataJSON avgt 5 0.113 ±0.012 ms/op
readTextDataCBOR avgt 5 0.058 ±0.007 ms/op
readTextDataJSON avgt 5 0.913 ±0.240 ms/op
writeDoubleDataCBOR avgt 5 0.038 ±0.004 ms/op
writeDoubleDataJSON avgt 5 1.100 ±0.059 ms/op
writeIntDataCBOR avgt 5 0.031 ±0.002 ms/op
writeIntDataJSON avgt 5 0.029 ±0.004 ms/op
writeTextDataCBOR avgt 5 0.032 ±0.003 ms/op
writeTextDataJSON avgt 5 0.676 ±0.044 ms/op

The varying performance characteristics of media types/serialisation formats based on the predominant data type in a message make proper HTTP content negotiation important. It cannot be known in advance when writing a server application what the best content type is, and it should be left open to the client to decide.

Perpetual Kerberos Login in Hadoop

Kerberos is the only real option for securing an Hadoop cluster. When deploying custom services into a cluster with Kerberos enabled, authentication can quickly become a cross-cutting concern.

Kerberos Basics

First, a brief introduction to basic Kerberos mechanisms. In each realm there is a Key Distribution Centre (KDC) which issues different types of tickets. A KDC has two services: the Authentication Service (AS) and the Ticket Granting Service (TGS). There are two ticket types issued: Ticket Granting Tickets (TGT) and Service Tickets. Every KDC has a special user called krbtgt and a service key derived from the password for the krbtgt account; a TGT is actually just a service ticket for the krbtgt account, encrypted with the krbtgt service key. The KDC has all the symmetric keys for all services and users in its realm.

The end goal of a user requesting a Kerberised service is to be able to present a service ticket obtained from the Ticket Granting Service to the Kerberised service in order to authenticate itself, but it needs to get a TGT and a TGS session key from the Authentication Service first. The sequence of requests and responses is as follows:

  1. REQ_AS: The Client requests an initial TGT from the KDC’s Authentication Service by passing its user key (the user key comes from a keytab file or a username and password). The presented key is checked against the client’s symmetric key (the KDC has this encrypted with its own service key).
  2. REP_AS: The Authentication Service issues a TGT which contains a TGS session key. The TGT has a lifetime and a renewable lifetime. Within the lifetime, the TGT can be cached and REQ_AS does not need to be made again: TGT lookup does not need to happen on each service request. The client creates an authenticator. The details of authenticator construction are too complicated to outline here. If a TGT is renewable, then only the TGS session key (not the TGT, which is large) need be refreshed periodically, and for each renewal the lifetime is reset.
  3. REQ_TGS: Now the client has a TGS session key, it can request a service ticket from the TGS. The client must know the service name, and have a TGS session key and an authenticator. If no TGS session key is found, REQ_AS must be reissued. REQ_TGS must be performed for each service (if you need to access Kafka as well as HBase, you would need to do REQ_TGS twice, once for Kafka and once for HBase, though your TGT and TGS session key are good for both).
  4. REP_TGS: The TGS has a local copy of the TGT associated with the TGS session key, which it checks against the authenticator and issues a service ticket. The service ticket is encrypted with the requested service’s symmetric key. Finally the user has a service ticket.
  5. REQ_APP: The client sends the service ticket to the service. The service decrypts the service ticket (it is encrypted by the TGS with the service’s symmetric key.)
  6. REP_APP (optional): The client can request mutual authentication, in which case the service will respond with another ticket.

UserGroupInformation API

Kerberos is quite simple in Java if you have access to JAAS. Some of the newer Hadoop ecosystem projects do use it (e.g. Kafka, Solr) but if you are using HBase or HDFS you need to use UserGroupInformation. The only part of the Kerberos mechanism pertinent for most use cases is TGT acquisition; UserGroupInformation will handle the rest.

To get a TGT, you need a principal name and a keytab so UserGroupInformation can issue REQ_AS.

UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(clientPrincipalName, pathToKeytab);
UserGroupInformation.setLoginUser(ugi);

If your keytab is good, this will give you a TGT and a TGS session key. HBase and HDFS components will get the created UserGroupInformation from the static method UserGroupInformation.getLoginUser(). In HADOOP-6656 a background task was added to perform TGS session key renewal. This will keep you logged in until the renewable lifetime is exhausted, so long as renewable tickets are enabled in your KDC. When the renewable lifetime is exhausted, your application will not be able to authenticate.

To get around that, you can use UserGroupInformation to perform REQ_AS on a scheduled basis. This grants perpetuity.

UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();

KerberosFacade

This can be done by a ScheduledExecutorService and wrapped up into a simple facade allowing you to login, logout, and execute actions as the logged in user, for as long as your service is up.

public class KerberosFacade implements Closeable {

  private static final Logger LOGGER = LoggerFactory.getLogger(KerberosFacade.class);

  private final ScheduledExecutorService refresher;
  private final String keytab;
  private final String user;
  private final int requestTGTFrequencyHours;
  private volatile ScheduledFuture<?> renewal;
  private final AuthenticationFailureListener failureListener;

  public KerberosFacade(AuthenticationFailureListener failureListener,
                        String keytab,
                        String user,
                        int reloginScheduleHours) {
    this.failureListener = wrap(failureListener);
    this.keytab = keytab;
    this.user = user;
    this.requestTGTFrequencyHours = reloginScheduleHours;
    this.refresher = Executors.newSingleThreadScheduledExecutor();
  }

  public void login() throws IOException {
    UserGroupInformation loginUser =
         UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, keytab)
    UserGroupInformation.setLoginUser(loginUser);
    this.renewal = refresher.scheduleWithFixedDelay(() -> {
      try {
        UserGroupInformation.getLoginUser()
                            .checkTGTAndReloginFromKeytab();
      } catch (Exception e) {
        onFailure(e);
      }
    }, requestTGTFrequencyHours, requestTGTFrequencyHours, TimeUnit.HOURS);
  }

  public void logout() {
    stopRefreshing();
    UserGroupInformation.setLoginUser(null);
  }

  public <T> T doAs(PrivilegedAction<T> action) {
    try {
      return UserGroupInformation.getCurrentUser()
                                 .doAs(action);
    } catch (IOException e) {
      onFailure(e);
      return null;
    }
  }

  public <T> T doAs(PrivilegedExceptionAction<T> action) throws PrivilegedActionException {
    try {
      return UserGroupInformation.getCurrentUser()
                                 .doAs(action);
    } catch (InterruptedException | IOException e) {
      onFailure(e);
      return null;
    }
  }

  @Override
  public void close() throws IOException {
    logout();
    refresher.shutdownNow();
  }

  private void stopRefreshing() {
    if (null != this.renewal) {
      this.renewal.cancel(true);
    }
  }

  protected void onFailure(Exception e) {
    failureListener.handle(this, e);
  }

  private static AuthenticationFailureListener wrap(AuthenticationFailureListener listener) {
    return (f, e) -> {
      LOGGER.error("Authentication Failure for " + f.user, e);
      if(null != listener) {
        listener.handle(f, e);
      }
    };
  }
}

Co-locating Spark Partitions with HBase Regions

HBase scans can be accelerated if they start and stop on a single region server. IO costs can be reduced further if the scan is executed on the same machine as the region server. This article is about extending the Spark RDD abstraction to load an RDD from an HBase table so each partition is co-located with a region server. This pattern could be adopted to read data into Spark from other sharded data stores, whenever there is a metadata protocol available to dictate partitioning.

The strategy involves creating a custom implementation of the Spark class RDD, which understands how to create partitions from metadata about HBase regions. To read data from HBase, we want to execute a scan on a single region server, and we want to execute on the same machine as the region to minimise IO. Therefore we need the start key, stop key, and hostname for each region associated with each Spark partition.

public class HBasePartition implements Partition {

  private final String regionHostname;
  private final int partitionIndex;
  private final byte[] start;
  private final byte[] stop;

  public HBasePartition(String regionHostname, int partitionIndex, byte[] start, byte[] stop) {
    this.regionHostname = regionHostname;
    this.partitionIndex = partitionIndex;
    this.start = start;
    this.stop = stop;
  }

  public String getRegionHostname() {
    return regionHostname;
  }

  public byte[] getStart() {
    return start;
  }

  public byte[] getStop() {
    return stop;
  }

  @Override
  public int index() {
    return partitionIndex;
  }
}

The HBase interface RegionLocator, which can be obtained from a Connection instance, can be used to build an array of HBasePartitions. It aids efficiency to check if it is possible to skip each region entirely, if the supplied start and stop keys do not overlap with its extent.

public class HBasePartitioner implements Serializable {

  public Partition[] getPartitions(byte[] table, byte[] start, byte[] stop) {
    try(RegionLocator regionLocator = ConnectionFactory.createConnection().getRegionLocator(TableName.valueOf(table))) {
      List<HRegionLocation> regionLocations = regionLocator.getAllRegionLocations();
      int regionCount = regionLocations.size();
      List<Partition> partitions = Lists.newArrayListWithExpectedSize(regionCount);
      int partition = 0;
      for(HRegionLocation regionLocation : regionLocations) {
        HRegionInfo regionInfo = regionLocation.getRegionInfo();
        byte[] regionStart = regionInfo.getStartKey();
        byte[] regionStop = regionInfo.getEndKey();
        if(!skipRegion(start, stop, regionStart, regionStop)) {
          partitions.add(new HBasePartition(regionLocation.getHostname(),
                                            partition++,
                                            max(start, regionStart),
                                            min(stop, regionStop)));
        }
      }
      return partitions.toArray(new Partition[partition]);
    }
    catch (IOException e) {
      throw new RuntimeException("Could not create HBase region partitions", e);
    }
  }

  private static boolean skipRegion(byte[] scanStart, byte[] scanStop, byte[] regionStart, byte[] regionStop) {
    // check scan starts before region stops, and that the scan stops before the region starts
    return min(scanStart, regionStop) == regionStop || max(scanStop, regionStart) == regionStart;
  }

  private static byte[] min(byte[] left, byte[] right) {
    if(left.length == 0) {
      return left;
    }
    if(right.length == 0) {
      return right;
    }
    return Bytes.compareTo(left, right) < 0 ? left : right;   }   private static byte[] max(byte[] left, byte[] right) {     if(left.length == 0) {       return right;     }     if(right.length == 0) {       return left;     }     return Bytes.compareTo(left, right) >= 0 ? left : right;
  }
}

Finally, we can implement an RDD specialised for executing HBasePartitions. We want to exploit the ability to choose or influence where the partition is executed, so need access to a Scala RDD method getPreferredLocations. This method is not available on JavaRDD, so we are forced to do some Scala conversions. The Scala/Java conversion work is quite tedious but necessary when accessing low level features on a Java-only project.

public class HBaseRDD<T> extends RDD<T> {

  private static <T> ClassTag<T> createClassTag(Class<T> klass) {
    return scala.reflect.ClassTag$.MODULE$.apply(klass);
  }

  private final HBasePartitioner partitioner;
  private final String tableName;
  private final byte[] startKey;
  private final byte[] stopKey;
  private final Function<Result, T> mapper;

  public HBaseRDD(SparkContext sparkContext,
                  Class<T> klass,
                  HBasePartitioner partitioner,
                  String tableName,
                  byte[] startKey,
                  byte[] stopKey,
                  Function<Result, T> mapper) {
    super(new EmptyRDD<>(sparkContext, createClassTag(klass)), createClassTag(klass));
    this.partitioner = partitioner;
    this.tableName = tableName;
    this.startKey = startKey;
    this.stopKey = stopKey;
    this.mapper = mapper;
  }

  @Override
  public Iterator<T> compute(Partition split, TaskContext context) {
    HBasePartition partition = (HBasePartition)split;
    try(Connection connection = ConnectionFactory.createConnection()) {
      Scan scan = new Scan()
                      .setStartRow(partition.getStart())
                      .setStopRow(partition.getStop())
                      .setCacheBlocks(false);
      Table table = connection.getTable(TableName.valueOf(tableName));
      ResultScanner scanner = table.getScanner(scan);
      return JavaConversions.asScalaIterator(
              StreamSupport.stream(scanner.spliterator(), false).map(mapper).iterator()
      );
    }
    catch (IOException e) {
      throw new RuntimeException("Region scan failed", e);
    }
  }

  @Override
  public Seq<String> getPreferredLocations(Partition split) {
    Set<String> locations = ImmutableSet.of(((HBasePartition)split).getRegionHostname());
    return JavaConversions.asScalaSet(locations).toSeq();
  }

  @Override
  public Partition[] getPartitions() {
    return partitioner.getPartitions(Bytes.toBytes(tableName), startKey, stopKey);
  }
}

As far as the interface of this class is concerned, it’s just normal Java, so it can be used from a more Java-centric Spark project, despite using some Scala APIs under the hood. We could achieve similar results with mapPartitions, but would have less control over partitioning and co-location.

HBase Connection Management

I have built several web applications recently using Apache HBase as a backend data store. This article addresses some of the design concerns and approaches made in efficiently managing HBase connections.

One of the first things I noticed about the HBase client API was how long it takes to create the connection. HBase connection creation is effectively Zookeeper based service discovery, the end result being a client which knows where all the region servers are, and which region server is serving which key space. This operation is expensive and needs to be minimised.

At first I only created the connection once, when I started the web application. This is very simple and is fine for most use cases.

public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(configuration);
}

This approach is great unless there is the requirement to proxy your end user when querying HBase. If Apache Ranger is enabled on your HBase cluster, proxying your users allows it to apply user specific authorisation to the query, rather than to your web application service user. This poses a few constraints: the most relevant being that you need to create a connection per user so you can’t just connect when you start your application any more.

Proxy Users

I needed to proxy users and minimise connection creation, so I built a connection pool class which, given a user principal, creates a connection as the user. I used Guava’s loading cache to handle cache eviction and concurrency. Guava’s cache also has a very useful eviction listener, which allows the connection to be closed when evicted from the cache.

In order to get the user proxying working, the UserGroupInformation for the web application service principal itself is required (see here), and you need to have successfully authenticated your user (I used SPNego to do this). The Hadoop class UserProvider is then used to create a proxy user. Your web application service principal also needs to be configured as a proxying user in core-site.xml, which you can manage via tools like Ambari.

public class ConnectionPool implements Closeable {

  private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPool.class);
  private final Configuration configuration;
  private final LoadingCache<String, Connection> cache;
  private final ExecutorService threadPool;
  private final UserProvider userProvider;
  private volatile boolean closed = false;
  private final UserGroupInformation loginUser;

  public ConnectionPool(Configuration configuration, UserGroupInformation loginUser) {
    this.loginUser = loginUser;
    this.configuration = configuration;
    this.userProvider = UserProvider.instantiate(configuration);
    this.threadPool = Executors.newFixedThreadPool(50, new ThreadFactoryBuilder().setNameFormat("hbase-client-connection-pool").build());
    this.cache = createCache();
  }

  public Connection getConnection(Principal principal) throws IOException {
    return cache.getUnchecked(principal.getName());
  }

  @Override
  public void close() throws IOException {
    if(!closed) {
      closed = true;
      cache.invalidateAll();
      cache.cleanUp();
      connectionThreadPool.shutdown();
    }
  }

  private Connection createConnection(String userName) throws IOException {
      UserGroupInformation proxyUserGroupInformation = UserGroupInformation.createProxyUser(userName, loginUser);
      return ConnectionFactory.createConnection(configuration, threadPool, userProvider.create(proxyUserGroupInformation));
  }

  private LoadingCache<String, Connection> createCache() {
    return CacheBuilder.newBuilder()
                       .expireAfterAccess(10, TimeUnit.MINUTES)
            .<String, Connection>removalListener(eviction -> {
              Connection connection = eviction.getValue();
              if(null != connection) {
                try {
                  connection.close();
                } catch (IOException e) {
                  LOGGER.error("Connection could not be closed for user=" + eviction.getKey(), e);
                }
              }
            })
            .maximumSize(100)
            .build(new CacheLoader<String, Connection>() {
              @Override
              public Connection load(String userName) throws Exception {
                LOGGER.info("Create connection for user={}", userName);
                return createConnection(userName);
              }
            });
  }
}

One drawback of this approach is that the user experiences a slow connection the first time they query the server or any time after their connection has been evicted from the cache. They will also observe a lag if you are sharding your application behind a load balancer without sticky sessions. If you use a round robin strategy connection creation costs will be incurred whenever there is a new instance/user combination route.