/
Tuning Accumulo

Tuning Accumulo

Introduction

To begin, we should go over some of the parameters that can be set in Accumulo and how you can set them. Pretty much everything is "zookeeper mutable" meaning that its stored in zookeeper and you can change it via the Accumulo shell using the syntax:

config -s <propname>=<propvalue>

So what can/should you change? This really depends on what you're trying to achieve. Let's start with the first thing that you should probably do which is turn on the data cache for your tables.

Tuning Query Performance

Enable Data Block Cache (table.cache.block.enable)

Accumulo stores data inside of rfiles which, for simplicity, are fancy B-trees. Enabling data cache allows "blocks" of rfiles to be stored in memory instead of read from HDFS during scanning of the table. If you're driving a real-time map with something like GeoMesa this is almost a requirement because you are generally hitting similar key ranges due to querying similar space/time boxes. Other schemes based on date or geo or similar attribute scans will benefit from data caching. 

How much memory you should use for block cache is dependent on how big your tablet servers are. Note that when you increase the block cache you'll need to increase the -Xmx argument to the tserver Java process by the same amount. We have experimented with running at 12G on 80 node clusters and 40G on 4 node beefy clusters. I'd start with 8G if you have a reasonable cluster and go higher if you really feel you need it and know what you're doing.

# global
config -s table.cache.block.enable=true
 
# per table (recommended?)
config -t <table> -s table.cache.block.enable=true
 
# set the size (remember to increase -Xmx accordingly)
config -s tserver.cache.data.size=8G

BatchWriter Client Parameters

Accumulo BatchWriters have three parameters that affect ingest:

  • Max Memory - How much data to batch together before flushing to the tservers (basically the buffer)
  • Max Latency - How long data is allowed to live in the buffer before being flushed to the tservers
  • Num Threads - Max number of threads to use for writing data to tservers

Essentially, the buffer in the batch writer will flush when it fills up with data or when the mutations added to it become too old. If you're running with an ack-ing system like Storm you probably want to ensure that the data buffer is 100Mb or so and never gets filled and instead rely on manually flush() calls to the batch writer to empty the buffer and ack the tuples responsible for the mutations.

Some suggested values for these are:

  • Max Memory - 50-100 MB
  • Max Latency - 60 seconds
  • Num Threads - 5-10
  • Timeout - 600 seconds

More information can be found at https://github.com/apache/accumulo/blob/master/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java

If you are using a GeoMesa feature writer or whiptail, these properties are controlled by system props:

geomesa.batchwriter.latency.millis   // default 60 sec
geomesa.batchwriter.memory           // default 50 mb
geomesa.batchwriter.maxthreads       // default 10
geomesa.batchwriter.timeout.millis   // defaults to Accumulo default which is Long.MAXVALUE

Prior to GeoMesa 1.2.0 these defaults were far too low and you should override them with system properties to your JVM:

-Dgeomesa.batchwriter.latency.millis=60000 -Dgeomesa.batchwriter.memory=52428800 -Dgeomesa.batchwriter.maxthreads=10 -Dgeomesa.batchwriter.timeout.millis=600000

Other BatchWriter Notes

In general, increasing the number of batch writers increases the throughput of BW-based ingest. One should note that with GeoMesa, the Z3 index and attr indexes are generally the bottlenecks because they are the least well distributed (at least initially) of the indexes. In general, the record indexes seem to have good splits. 

Streaming Ingest - With streaming ingest we experience a pattern we call "monotonically time increasing dataflow" meaning that the record times are increasing in order as data is being ingested. You'll get this when using Storm and a live data feed. If  you're doing MapReduce-based ingest it may not be as prevalent. Note that the problem with this type of ingest is that it tends to concentrate writing on a few Z3 splits. Future changes with sharding the Z3 index should help.

Server Side Parameters

Methodology

Tuning Accumulo should be approached methodically. There is no golden config that can be applied to every cluster. You can, however, discover through the ingest graphs and log files what's happening and why and then adjust to improve performance.

As data is written to Accumulo with BatchWriters, the data is stored in in-memory map files as well as written to the Write-Ahead Log (WAL). The in-memory map serves as a data structure that can be read by a scanner and merged with rfiles on disk to provide a comprehensive view of the data. The WAL serves as a "transactional" guarantee that the data stored in memory will not be lost. If the tserver dies unexpectedly or a Hadoop data node dies, the WAL serves as a way of recovering the data that was in memory. Fundamental to the WAL is the idea of syncing data to the disk for durability. The method of doing this is an "hsync" which is a Hadoop Sync. This in turn results in an "fsync" or File Sync to the underlying filesystem (ext4 or xfs). It is important to understand that these syncs are very expensive from a time point of view and are generally the limitation of ingest.

At some point, these in-memory maps are minor compacted which means that they are written to disk (specifically HDFS) as rfiles. This means that there can be multiple rfiles that comprise a tablet. As you can imagine, there is an upper bound on how many of these rfiles for a tablet can exist for performance reasons since they must be merged at scan time. The process of combining small rfiles on disk into larger rfiles is called major compaction.

Tuning a cluster for BatchWriter ingest is a game of ensuring that minor compactions are well managed and that WAL operation throughput/speed is happy. Generally major compactions aren't a major issue. But if they are you should control them. Minor compactions may occur for many reasons:

  1. The maximum number of WALs for a tablet has been exceeded
  2. The maximum number of rfiles for a given tablet has been exceeded
  3. The in-memory data buffer on the tablet server has filled up and needs to be written to disk
  4. A WAL associated with an keyrange in the in-memory map fills up
  5. The in-memory map has been idle for a configurable amount of time

Our strategy is basically ensuring that #5 are the main reason that minor compactions happen...this should ensure that the WAL performance is the rate-limiting factor. It is important to note that WALs on a tserver have a metadata entry per tablet on that tserver. This means that it can be extremely expensive to perform massive WAL/minc operations such as registering a new WAL since the tserver must make order number of tablets served entries in the metadata table. Thus, #1 and #4 can be very costly.

Increase In-Memory Map Size

tserver.memory.maps.max

A good starting point for the in-memory maps is 2 gig if you are doing light ingest or 4G if you are doing heavy ingest.

# light ingest
config -s tserver.memory.maps.max=2G
 
# heavy ingest
config -s tserver.memory.maps.max=4G
 
# low num tservers that are very large (256G ram)
config -s tserver.memory.maps.max=8G

Make sure that your native maps are enabled (they are by default). If you are NOT using native maps adjust your -Xmx for the tserver JAVA_OPTS accordingly...Native maps are off heap so make sure that your allocation of memory on the server includes native maps in case you are running other services like Storm, Mesos, MapReduce, or Spark.

tserver.memory.maps.native.enabled=true

Increase Max Num Files per Tablet

table.file.max


This is not usually a problem. I'd suggest only doing this if you can verify that your tables are doing Merging Minor Compactions by looking in the logs or at the names of rfiles in HDFS. You can figure out the types of rfiles by running:

hadoop fs -ls /accumulo/tables/<tableid>/* | grep rf | awk '{n=split ($0,a,/\//);f=substr(a[n],0,1);print f}' | sort | uniq -c

One attempt to count the merging minc files is:

tableid="1td"
accumulo shell -u <user> -p <pass> -e "scan -np -t accumulo.metadata -b ${tableid} -e ${tableid}\xff -c file" | awk '{print $2}' | awk -F '/' '{print $8}' | grep 'M' | wc -l

The maximum number of files per tablet should be increased to ensure that a process known as merging minor compactions don't happen. Merging Minor Compactions take the smallest rfile on disk and merge it with the in-memory map to be minor compacted. This is very bad and can slow ingest. However, having fewer files on disk means that you have less to merge at scan time...adjusting the compaction ratio to force more major compactions (combining on disk small rfiles) can alleviate this without hurting ingest too much.


config -s table.file.max=25

Deciding what this number should be is tough. The best way is to look at the rfiles on disk for a table. You can do this using the hadoop fs command. The files for a table follow a naming convention (taken from accumulo docs):

  • F - Flush: entries in memory were written to a file (Minor Compaction)
  • M - Merging compaction: entries in memory were combined with the smallest file to create one new file
  • C - Several files, but not all files, were combined to produce this file (Major Compaction)
  • A- All files were compacted, delete entries were dropped
  • I - Bulk import, complete, sorted index files. Always in a directory starting with b-

So ideally, your files would be few in number, lots of C and A, and some F. If you're doing bulk ingest then obviously you'll have I files...BUT if you see lots of M files that that is BAD

Write-ahead Log (WAL) Size

tserver.walog.max.size & tserver.wal.blocksize

Increasing the WAL size and blocksize accordingly are important for ensuring that the WALs don't fill up and cause minor compactions. Filling up a WAL is very expensive since all key extents that it references must be minor compacted and subsequently a new WAL must be created and registered for those key extents. This means that a metadata table entry must be read and then written for possibly every Tablet on a tablet server! If you have 10k tablets on a tablet server this take a long time.

Tradeoff: The larger the WALs are, the longer it takes to recover the WAL and the entries in it if the tablet server dies. Remember that. Often times, massive failure of tservers can trigger lots of recoveries which can run out of memory on tservers with low -Xmx values (less than 12 or 16 gig).

Advice: Only increase this if you notice that the new WALs are being registered all the time. Increasing the number of WALs is probably a better option before this. You'll notice that you need larger WALs if the tserver debug log mentions lots of "Adding 1 logs for extent ...." every 1-2 minutes. A lot of times this happens due to Z3 ingest so go to the tservers for the Z3 table and sort by ingest to find which one is being filled up...

Note: Even though the Accumulo documentation states this is zookeeper mutable...it appears to require a tdown/tup

The goal of the wal blocksize is to use only one block in HDFS so that the hsync/fsyncs happen on a single machine per HDFS replica (probably 3). Setting it to zero makes it 110% of the wal size.

# If you need to only...
config -s tserver.walog.max.size=2G
 
# on large servers (256G ram)...becareful with this
tserver.walog.max.size=4G
 
# note that this sets it to 110% of the value of tserver.walog.max.size
config -s tserver.wal.blocksize=0

Number of WALs Threshold

table.compaction.minor.logs.threshold

By default, only three WALs can be registered against a single tablet (and thus an in-memory map). This is because WALs are expensive data structures to read and process. They are optimized for writing whereas rfiles are optimized for reading. The real purpose of the WAL is only for durability and thus you don't want a ton of them lying around. Accumulo controls this by forcing minor compactions when the value is exceeded. This is an especially expensive way to go out. The trade-off is of course longer recovery times in the event that a tserver dies or is decommissioned. 

# try 6
config -t <table> -s table.compaction.minor.logs.threshold=6
 
# if you still see log statements about this increase more
config -t <table> -s table.compaction.minor.logs.threshold=10

Mutation Queue Size

tserver.mutation.queue.max

As we talked about before...the number of hsyncs or fsyncs can be the rate limiting factor on ingest in the ideal situation since entries to the WAL must be persisted to the underlying disk layer. To optimize this process, mutations are queued together for sync-ing. This parameter controls the maximum buffer size for sync.

Note: If you are doing ingest of large mutations (such as images, large binary blobs, etc.) that are close to 1M in size you may want to increase this to 10M or so in order to efficiently batch syncs of the WAL.

config -s tserver.mutation.queue.max=2M

Concurrent Compactions

tserver.compaction.minor.concurrent.max & tserver.compaction.major.concurrent.max

If your nodes have beef and memory you can generally handle more compactions (major and minor) than the defaults. If you aren't under heavy query load, I'd suggest making sure that neither your major or minor compactions are "plateau-ing" in the overview page. This is where you see straight horizontal lines in the graph. Generally you should see spikey lines for minc/majc. If your OS load is spiking this may be the problem and you might have gone too high.

config -s tserver.compaction.minor.concurrent.max=10
config -s tserver.compaction.major.concurrent.max=8

Recovery & Migrations

tserver.migrations.concurrent.max

When recovering WALs, decommissioning tservers, reassigning tablets from dead-tservers, or turning clusters on and off tservers generally only handle one concurrent tablet migrations. Sometimes you want to speed this up and you can do so:

config -s tserver.migrations.concurrent.max=3

I recommend setting it back to the default value afterwards.

Example Config

Medium Cluster

Servers

80

Cores12 physical, 24 virtual
NIC2 - 10G bonded
Disks
Java Args-Xmx30G

Config:

config -s table.cache.block.enable=true
config -s tserver.cache.data.size=12G

config -s tserver.compaction.minor.concurrent.max=9
config -s tserver.compaction.major.concurrent.max=8
config -s tserver.recovery.concurrent.max=2

config -s tserver.wal.blocksize=2G
config -s tserver.walog.max.size=2G
config -s tserver.mutation.queue.max=2M
config -s tserver.memory.maps.max=4G

config -s table.compaction.minor.logs.threshold=6
config -s table.file.max=30
 
config -s tserver.readahead.concurrent.max=32
config -s tserver.recovery.concurrent.max=4

Small Cluster, Large Servers

Servers4
Cores24 physical, 48 virtual
NIC

1 - 1 G

Disks

6 1TB, 2.5inch, unk RPM

Memory

256G

Java Args-Xmx80G

Config:

config -s table.cache.block.enable=true
config -s tserver.cache.data.size=40G
 
config -s tserver.compaction.minor.concurrent.max=50
config -s tserver.compaction.major.concurrent.max=8
config -s tserver.recovery.concurrent.max=4
 
config -s tserver.wal.blocksize=2G
config -s tserver.walog.max.size=4G
config -s tserver.mutation.queue.max=2M
config -s tserver.memory.maps.max=12G
 
config -s table.compaction.minor.logs.threshold=10
config -s table.file.max=30