Thursday, December 26, 2013

How big should a buffer be?


Following on free a previous post on buffer bloat, a good question is: how big should the socket buffer size be? Using my simple JStringServer code, I ran some tests and plotted some graphs and found a sweet spot (for me. YMMV).

As a bit of an aside, the R-language was used to generate the graphs, a task for which it is very well suited. I'm no R expert, so this can probably be done better. But this is what it looks like:

# Load the data from a CSV file with headers
mydata <- read.table("~/Documents/Docs/bufferBloat.txt", header=TRUE, sep=",")

# Aggregate the data and calculate the mean and standard deviation.
# Note: the do.call is to make the data into the right type

attach(do.call(data.frameag <- aggregate(. ~ SO_RCVBU, mydata, function(x) c(mean = mean(x), sd = sd(x)))), warn.conflicts=FALSE)

# Plot the graph calls-per-second (cps) against the server-side SO_RCVBUF 
# Note that the x-axis (SO_RCVBUF) uses a logarithmic scale
plot(SO_RCVBUcps.meanlog="x", ylim=c(3000, 5000), ylab="calls per second")

# add the title
title(main="Mac Book Pro client calls/second vs. server-side SO_RCVBU")

# Add the standard deviations using simple lines
# see http://stat.ethz.ch/R-manual/R-devel/library/graphics/html/segments.html
segments (SO_RCVBU, cps.mean - cps.sd, SO_RCVBU, cps.mean + cps.sd)

# copy the screen to disk (don't forget to close the file handle)
# see http://stackoverflow.com/questions/7144118/how-to-save-a-plot-as-image-on-the-disk
dev.copy(jpeg,filename="~/Documents/Docs/bufferBloat_cps_vs_SO_RCVBU.jpg")
dev.off()

# Now much the same for call duration vs. SO_RCVBUF
plot(SO_RCVBUduration.meanlog="x", ylab="calls duration (ms)")
title(main="Mac Book Pro client call time vs. server-side SO_RCVBUF")
dev.copy(jpeg,filename="~/Documents/Docs/bufferBloat_duration_vs_SO_RCVBU.jpg")
dev.off()

Call times
Call time (ms) vs SO_RCVBUF value

Throughput
Number of calls per second vs. SO_RCVBUF
The results were taken from a (old-ish) Mac Book Pro while a (new-ish) Mac Book Air was also stressing the (Linux) server.

The results show that the optimal size for SO_RCVBUF for this application is about 5000. A buffer size too small cripples throughput. But the throughput peaks quite quickly and further increasing it does not seem to help throughput.

Note: significantly increasing the buffer size does not terribly impact performance but I noticed that the client would occasionally throw this nasty exception:

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
at sun.nio.ch.IOUtil.read(IOUtil.java:169)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
at com.google.code.jstringserver.client.WritingConnector.read(WritingConnector.java:88)
at com.google.code.jstringserver.client.WritingConnector.connected(WritingConnector.java:55)
at com.google.code.jstringserver.client.ConstantWritingConnector.connected(ConstantWritingConnector.java:74)
at com.google.code.jstringserver.client.Connector.doCall(Connector.java:39)
at com.google.code.jstringserver.client.ConstantWritingConnector.doCall(ConstantWritingConnector.java:58)
at com.google.code.jstringserver.client.Networker.run(Networker.java:18)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)

(Cause to be determined).

This only happened for large buffer sizes.

Further R reading

R-Statistics blog.
StatMethods

Saturday, December 21, 2013

FastLocks in the JVM - part 2

I was wondering why AtomicIntegerArrays etc were slow. Alternatively, why are other methods (notably using synchronized) so fast? Now, I'm not a machine code guru nor a JVM coder so please take some of this post with a pinch of salt. Here are my findings.

Let's write some Java code that plugs into the same simple framework I mentioned in the previous post by extending our AbstractReaderRunnable class but this time, let the reader access a plain, old int[] array in a synchronized method (similarly for the writer).

There's nothing clever in the Java code, but this is how the machine code generated by HotSpot looks:

.
.
08a    andq    R10, #7 # long
08e    cmpq    R10, #5
092    jne     B20  P=0.000001 C=-1.000000
.
.

(where R10 points to memory in the stack).

Here, we're checking the 3 lowest order bits (#7) of R10. I believe the most significant bit refers to whether biased locking is enabled for an object of this class and "as long as an object is unlocked, the last two bits have the value 01." [1] "These bits are commonly referred to as the object sync bits." [3].

This analysis seems to be confirmed by Dave Dice, who wrote much of this, when he describes "the operation of stack-locking[:] The inlined enter code emitted by the JIT will first fetch and examine the mark word. (That code first checks to see if the object is biased)... If the mark word is neutral as determined by the low-order bits of the mark, we'll try to use stack locking. First, in anticipation of a successful compare-and-swap (CAS), the code will store the just-fetched mark value into the on-frame word that was allocated by the JIT and is associated with that particular bytecode offset. Next, the inline enter code will attempt to CAS the address of that on-frame location over the mark word. If successful, we've locked the object." [6]

So, we're checking whether the object allows biased locking and is unlocked. If so, we jump to:

1af   B20: # B7 B21 <- 0.351352="" b19="" b5="" div="" nbsp="" req:="">
1af    leaq    R11, [rsp + #48] # box lock
1b4    fastlock RBP,R11,RAX,R10
205    je     B7  P=0.999999 C=-1.000000

Fastlock is not an x86 instruction but inlined JVM code that performs the locking [2]. I've not been able to find exactly what this refers to in the OpenJDK source code, so I had to resort to Googling.

"Synchronization on Java objects is done using a fast lock mechanism using light-weight lock records (referred to as fast locks) in most cases and only done using a real lock mechanism (referred to as inflated locks) when needed. The premise behind this implementation is that contention on Java locks are rare. Hence, there is no need to associate an inflated lock with the object until contention occurs. Using an inflated lock tends to be slower than just using a fast lock.

"When a thread attempts to lock the object, it uses atomic operations to check and set the sync bits as well as the header word. If the sync bits are CVM_LOCKSTATE_UNLOCKED, then there is no contention on this object yet. Within the same atomic operation, the header word will be replaced with a pointer to a fast lock record and the sync bits are set to CVM_LOCKSTATE_LOCKED ... If the same thread attempts to re-enter the lock on this object, it will find that the sync bits are already set to CVM_LOCKSTATE_LOCKED, and check to see if it is the owner of the fast lock. Since the current thread does own this fast lock, it simply increments the reentry count in the fast lock record and proceed with code execution." [3]

So, no expensive lock instructions in the uncontended case - unlike in AtomicIntegerArrays.

"If a different thread attempts to acquire the lock on this object, it will check and see that it is not the owner of the fast lock record. This is considered a contention case which will trigger the inflation of the lock." [3] "Deflation occurs at all stop-the-world safepoints" which occur frequently. There are 3 safepoints in the JITed read method alone.

In the inflated fast-path, "there's no need to block or wake threads (which requires 1 atomic for an enter-exit pair)." [6] ("The slow-path implementation is in native C++ code while the fast-path is emitted by the JITs.") The ObjectMonitor class (in objectMonitor.hpp)  has a linked list of ObjectWaiter objects which "serves as a "proxy" or surrogate thread" and SpinCallbackArguments (in objectMonitor.cpp) that allow Adaptive Spinning [7] which uses a spin-then-block strategy based on measured success. This is currently the limit of my knowledge on how the JVM manages high throughput in this area.

So, why care?

Dice describes the JVM's locking as "considerably better (lower latency and better scalability) than native pthreads synchronization implementations." [6] So, why isn't it used everywhere?

Well, as ever, there is a trade-off. It's described as "optimized for system-wide throughput at the expense of short-term thread-specific fairness" [4] and "HotSpot favors throughput [and] favor[s] recently-run threads" [5].

We can see this quite easily. Given a writer class that looks a little like this:

public abstract class AbstractWriterRunnable implements Runnable {
    
    protected int index = 0;
    protected int lap = 1;
    private long duration;

    @Override
    public void run() {
        long start = System.currentTimeMillis();
        while (index < ARRAY_SIZE) {
            setElementAndIncrementCounters();
        }
        duration = System.currentTimeMillis() - start;
    }

    private void setElementAndIncrementCounters() {
        setElement();
        index++;
    }
    
    protected abstract void setElement();
}

that runs in one thread and a reader class that looks like this:

public abstract class AbstractReaderRunnable implements Runnable, Detailed {
    
    protected int index = 0;
    protected int lap = 0;
    private long duration;
    private int doNotCompileMeAway;

    @Override
    public void run() {
        long start = System.currentTimeMillis();
        while (index < ARRAY_SIZE) {
            lap = elementAtIndex();
            while (lap == 0) {
                lap = elementAtIndex();
                doNotCompileMeAway |= lap;
            }
            index++;
        }
        duration = System.currentTimeMillis() - start;
    }
    
    protected abstract int elementAtIndex();
.
.

that runs in another thread, we can implement setElement and elementAtIndex in subclasses, one pair to read and write to an array in synchronized methods and another to get and set on an AtomicIntegerArray.

No Free Meal

The results are very interesting. After one run that throw away the results (as ever) to let the JVM warm up, they consistently look something like this:

Synchronized Array Access
read thread took 9550ms, writer thread took 6516ms

AtomicIntegerArray Access
read thread took 74693ms, writer thread took 74693ms

Synchronized array access is consistently better over all my runs but notice the difference between the times for read and write threads - the synchronized reader is 3 seconds behind the writer while the AtomicIntegerArray reader is hot on its writer's heals.

Presumably, is an example of synchronized access sacrificing "short-term thread-specific fairness" for increased throughput.


[1] Synchronization and Object Locking.
[2] "Deep dive into assembly code from Java" -Kohsuke Kawaguchi's blog.
[3] CDC HotSpot Implementation.
[4] Synchronization in Java SE 6 (HotSpot).
[5] Synchonrization.
[6] "Lets say you're interested in using HotSpot as a vehicle for synchronization research" - Dave Dice's blog.
[7] Java SE6 Performance White Paper.


Friday, December 20, 2013

FastLocks in the JVM

I mentioned how AtomicIntegerArray and its kin can be slow. I'm now a little closer to finding why.

I decompiled my code that has one thread writing to and one thread reading from an AtomicIntegerArray using the JVM arguments:

-Xmx32g -server -XX:+PrintOptoAssembly -XX:CompileCommand=print,*Runnable -XX:CompileThreshold=10000

Using Java version:

$ /usr/java/jdk1.7.0_10/bin/java -version
java version "1.7.0_10"
Java(TM) SE Runtime Environment (build 1.7.0_10-b18)
Java HotSpot(TM) 64-Bit Server VM (build 23.6-b04, mixed mode)

My code is testing reading and writing to an array structure. Initially, all values are 0. 

The writer thread goes through the array setting the values to 1. 

The reader thread follows the writer as best it can. If an element is 0 (that is, uninitialized) it spins waiting for the value to change to 1.

The Java code for reading looks something like this in the super class:

public abstract class AbstractReaderRunnable implements Runnable {    
    protected int index = 0;
    protected int lap = 0;

    @Override
    public void run() {

        while (index < ARRAY_SIZE) {
            lap = elementAtIndex();
            while (lap == 0) {
                lap = elementAtIndex();
            }
            index++;
        }
    }

    protected abstract int elementAtIndex();


and in my particular subclass (that is, a class for which the array structure is a AtomicIntegerArray) looks like this:


class AtomicReaderRunnable extends AbstractReaderRunnable {

    private final AtomicIntegerArray array;

    protected int elementAtIndex() {
        return array.get(index);
    }
.
.

Fairly simple. Now comes the hard part: the machine code.

Note: "In Intel syntax the first operand is the destination, and the second operand is the source whereas in AT&T syntax the first operand is the source and the second operand is the destination." [1]

HotSpot by default appear to print the assembly in Intel format, although you can change this [2]

069     movl    R10, [R13 + #24 (8-bit)]        # int ! Field com/phenry/concurrent/lockless/AbstractReaderRunnable.index
06d     movq    R8, [R11 + #16 (8-bit)] # ptr ! Field java/util/concurrent/atomic/AtomicIntegerArray.array
.
.
So, the R10 registry is our index field and R8 points to the int[] array in the Java AtomicIntegerArray class.
.
.
094     movl    R11, [R8 + #24 + R10 << #2]     # int
099     MEMBAR-acquire ! (empty encoding)
099
099     movl    [R13 + #28 (8-bit)], R11        # int ! Field com/phenry/concurrent/lockless/AbstractReaderRunnable.lap

In English: here we multiply R10 by 4 by shifting it twice to the left (<< #2) - because ints are 4 bytes wide - and apparently add this to the start of the array to find our memory address. The contents of this we put into the R11 registry. In turn, this is put in the memory address where we store our lap field in the Java world.

Note that HotSpot appears to have inlined methods [6]. The code generated by Hotspot is not necessarily isomorphic to the original Java code. Also note that it might recompile it to machine code more than once, apparently depending on the usage profile.

Now, the thing of interest is the MEMBAR-acquire ! (empty encoding) line. This is not an x86 instruction and doesn't even take an address (the left-most column). So, why is it there?

"Each CPU has its own peculiar memory-barrier instructions, which can make portability a challenge" [3] The x86 hardware has a NoOp for most membar operations:

void LIR_Assembler::membar() {
  // QQQ sparc TSO uses this,
  __ membar( Assembler::Membar_mask_bits(Assembler::StoreLoad));
}

void LIR_Assembler::membar_acquire() {
  // No x86 machines currently require load fences
  // __ load_fence();
}

void LIR_Assembler::membar_release() {
  // No x86 machines currently require store fences
  // __ store_fence();
}

void LIR_Assembler::membar_loadload() {
  // no-op
  //__ membar(Assembler::Membar_mask_bits(Assembler::loadload));
}

void LIR_Assembler::membar_storestore() {
  // no-op
  //__ membar(Assembler::Membar_mask_bits(Assembler::storestore));
}

void LIR_Assembler::membar_loadstore() {
  // no-op
  //__ membar(Assembler::Membar_mask_bits(Assembler::loadstore));
}

void LIR_Assembler::membar_storeload() {
  __ membar(Assembler::Membar_mask_bits(Assembler::StoreLoad));
}

(from Hotspot's source code that can be found in hotspot/src/cpu/x86/vm/c1_LIRAssembler_x86.cpp).

The membar method itself looks like this:

  // Serializes memory and blows flags
  void membar(Membar_mask_bits order_constraint) {
    if (os::is_MP()) {
      // We only have to handle StoreLoad
      if (order_constraint & StoreLoad) {
        // All usable chips support "locked" instructions which suffice
        // as barriers, and are much faster than the alternative of
        // using cpuid instruction. We use here a locked add [esp],0.
        // This is conveniently otherwise a no-op except for blowing
        // flags.
        // Any change to this code may need to revisit other places in
        // the code where this idiom is used, in particular the
        // orderAccess code.
        lock();
        addl(Address(rsp, 0), 0);// Assert the lock# signal here
      }
    }
  }

(from assembler_x86.hpp in the HotSpot source code).

And true to the comment, we do see lines output like:

     lock addl [rsp + #0], 0 ! membar_volatile

but it's in the writer code! The writer code looks like this:

070     movl    R8, [RCX + #28 (8-bit)] # int ! Field com/phenry/concurrent/lockless/AbstractWriterRunnable.lap
.
.
078     movl    R11, [RCX + #24 (8-bit)]        # int ! Field com/phenry/concurrent/lockless/AbstractWriterRunnable.index
07c     movq    R10, [R10 + #16 (8-bit)]        # ptr ! Field java/util/concurrent/atomic/AtomicIntegerArray.array
.
.
Similar to before we have a register that's our index field (R11) and  the int[] array in the Java AtomicIntegerArray class (R10). Also similar to before, we calculate the address that this index points to in the array but this time we populate its value with R8 (our lap field in our Java code).
.
.
08e   B8: #     B4 B9 <- 287336="" b7="" font="" nbsp="" req:="">
08e     MEMBAR-release ! (empty encoding)
08e
08e     movslq  R11, R11        # i2l
091     movl    [R10 + #24 + R11 << #2], R8     # int
096     lock addl [rsp + #0], 0 ! membar_volatile

As before we have the (redundant) MEMBAR pseudo instruction but we also have the very significant lock prefix.

The reader code does not need to worry since the membar has flushed the store buffers (which "enable our fast processors to run without blocking while data is transferred to and from the cache sub-system" [7]. These fences are more related to instruction ordering - another side effect of the synchronize semantics - than the cache subsystem). Furthermore:

"If a core wants to read some memory, and it does not have it ... then it must make a read on the ring bus.  It will then either be read from main-memory if not in the cache sub-systems, or read from L3 if clean, or snooped from another core if Modified.  In any case the read will never return a stale copy from the cache sub-system, it is guaranteed to be coherent." [7]

This seems to be a standard way of synchronizing outside the JVM [5]. So, why is it so slow? The JVM has some very clever code to avoid this heavy-weight approach that I will go into in a future post.

[1] http://www.ibiblio.org/gferg/ldp/GCC-Inline-Assembly-HOWTO.html
[2] http://stackoverflow.com/questions/9337670/hotspot7-hsdis-printassembly-intel-syntax
[5] StackOverflow.
[6] "Deep dive into assembly code from Java" -Kohsuke Kawaguchi's blog.
[7] "CPU Cache Flushing Fallacy" - Martin Thompson's blog.


Sunday, December 8, 2013

Synching Sites

A particular data centre of 40 nodes we have must also send all the data to a secondary, backup site. We use Oracle Coherence's Push Replication pattern to keep these two sites synchronised. How does it do this and how do we write tests for it?

Push Rep uses Coherence*Extend servers which "allows you to access remote Coherence caches using standard Coherence APIs" [1]. It uses TCP to do this, not TCMP (an asynchronous protocol that uses UDP for data transferal) as Coherence normally uses.

Typically, you should "configure Coherence*Extend proxy servers on a dedicated set of machines that server no other purpose. This will allow you to scale your proxy servers independently from the storage-enabled members of the cluster... Coherence*Extend proxy servers are full-blown TCMP members, so they should have fast and reliable communication with the rest of the cluster." [1]

This is not possible nor desirable for our LittleGrid testing since we're just testing functionality. We will, however, need some of the production-like config. On the Extend Server side, we need:
    <caching-schemes>
        <proxy-scheme>
            <service-name>ExtendTcpProxyService</service-name>
            <thread-count>5</thread-count>

            <acceptor-config>
                <tcp-acceptor>
                    <local-address>
                        <address>localhost</address>
                        <port>32199</port>
                    </local-address>
                </tcp-acceptor>
[Note: I reduced the thread count for my test. This value is 50 in production].

On the side of the cluster that is a client of this Extend server (the production site in our case), the config looks like:

   <caching-schemes>
        <remote-invocation-scheme>
            <scheme-name>remote-contingency-invocation</scheme-name>
            <service-name>RemoteContingencyInvocationService</service-name>
            <initiator-config>
                <tcp-initiator>
                    <remote-addresses>
                        <socket-address>
                            <address>localhost</address>
                            <port>32199</port>
                        </socket-address>
                    </remote-addresses>
                    <connect-timeout>30s</connect-timeout>
                </tcp-initiator>

Instead of the usual method of executing code against the data using EntryProcessors, PushRep executes an Invocable against an InvocationService. (Your implementation of Invocable can execute the EntryProcessors you know and love on the remote InvocationService but note that InvocationService.query does not take an affined key as execution of EntryProcessors do).

As for LittleGrid configuration, your primary config will need something like:

        Builder builder = newBuilder()
            .setCacheConfiguration("FILE_WITH_CLIENT_XML_ABOVE")
.
.

and the Extend server config appears to need something like:

            newBuilder().setStorageEnabledExtendProxyCount(1)...

[1] Oracle Coherence 3.5.

Sunday, November 24, 2013

Bufferbloat: less is more

You would have thought that increasing buffer sizes was always a good thing, right? Wrong.

You would have thought that reducing load on a system would always make it faster, right? Also wrong.

When stress testing our code in an Oracle lab in Edinburgh, we noticed that increasing the load on the system increased throughput. Independently, on totally different software (nothing in common other than it's written in Java and some of it's running on Linux) I saw the same thing on my home network.

In both cases, a large network buffer size and low load was the problem. At home, I saw this:

Initiated 7855 calls. Calls per second = 846. number of errors at client side = 0. Average call time = 81ms
Initiated 9399 calls. Calls per second = 772. number of errors at client side = 0. Average call time = 89ms
Initiated 10815 calls. Calls per second = 708. number of errors at client side = 0. Average call time = 96ms
.
.

etc until I started a second machine hitting the same single-threaded process whereupon performance shot up:

Initiated 18913 calls. Calls per second = 771. number of errors at client side = 0. Average call time = 107ms
Initiated 21268 calls. Calls per second = 1177. number of errors at client side = 0. Average call time = 105ms
Initiated 24502 calls. Calls per second = 1617. number of errors at client side = 0. Average call time = 99ms
Initiated 29802 calls. Calls per second = 2650. number of errors at client side = 0. Average call time = 88ms
Initiated 34192 calls. Calls per second = 2195. number of errors at client side = 0. Average call time = 82ms
Initiated 39558 calls. Calls per second = 2683. number of errors at client side = 0. Average call time = 77ms

How odd - more load on the server means better throughput.

I was browsing the subject of bufferbloat on various websites including Jim Getty's excellent blog [1] where he writes extensively on the topic. He says:

"... bloat occurs in multiple places in an OS stack (and applications!). If your OS TCP implementation fills transmit queues more than needed, full queues will cause the RTT to increase, etc. , causing TCP to misbehave."

Inspired by this, I added to my code:

        serverSocketChannel.setOption(
            SO_RCVBUF,
            4096);

before binding the channel to an address and the problem went away (the default value for this option was about 128kb on my Linux box).

Note that although this looks like a very small number, there is no fear of a buffer overrun.

"The TCP socket received buffer cannot overflow because the peer is not allowed is not allowed to send data beyond the advertised window. This is TCP's flow control" [2].

Curious to see why reducing the buffer size helps things, I tried sizes of 512, 1024, 2048 and so on until 65536 bytes while running

sudo tcpdump -nn -i p7p1 '(tcp[13] & 0xc0 != 0)'

which according to [3] should show me when the network experiences congestion (p7p1 is the name of my network interface, by the way).

The first value for SO_RCVBUF at which poor initial performance is encountered was 8192 bytes. Interestingly, as soon as the second client started hitting the server, tcpdump started spewing output like:

17:54:28.620932 IP 192.168.1.91.59406 > 192.168.1.94.8888: Flags [.W], seq 133960115:133961563, ack 2988954847, win 33304, options [nop,nop,TS val 620089208 ecr 15423967], length 1448
17:54:28.621036 IP 192.168.1.91.59407 > 192.168.1.94.8888: Flags [.W], seq 4115302724:4115303748, ack 2823779942, win 33304, options [nop,nop,TS val 620089208 ecr 15423967], length 1024
17:54:28.623174 IP 192.168.1.65.51628 > 192.168.1.94.8888: Flags [.W], seq 1180366676:1180367700, ack 1925192901, win 8688, options [nop,nop,TS val 425774544 ecr 15423967], length 1024
17:54:28.911140 IP 192.168.1.91.56440 > 192.168.1.94.8888: Flags [.W], seq 2890777132:2890778156, ack 4156581585, win 33304, options [nop,nop,TS val 620089211 ecr 15424257], length 1024

What can we make of this? Well, it appears that the bigger the buffer, the longer a packet can stay in the receiver's queue as Getty informs us [1]. The longer it stays in the queue, the longer the round trip time (RTT). The longer the RTT, the worse the sender thinks the congestion is as it doesn't differentiate between time lost on the network and time stuck in a bloated stupid FIFO queue. (The RTT is used in determining the congestion [4])

Given a small buffer, the receiver will, at a much lower threshold, tell the sender not to transmit any more packets [2]. Thus the queue is smaller and less time is spent in it. As a result, the RTT is low and the sender believes the network to be congestion-free and is inclined to send more data.

Given a larger buffer but with greater competition for resources (from the second client), the available space in the buffer is reduced so it things look very similar to the client as described in the previous paragraph.

It appears that the Linux community are wise to this and have taken countermeasures [5].

[1] JG's Ramblings.
[2] Unix Network Programming, p58, Stevens et al, p207
[3] Wikipedia.
[4] RFC 5681.
[5] TCP Small Queues.

To Nagle or Not

Some TCP properties Java programmers have control over, others they don't. One optimisation is in Socket.setTcpNoDelay, which is to do with Nagle's algorithm (note: calling setTcpNoDelay with true turns the algorithm off). Basically, this tells the OS to batch your packets.

When you should turn it on or off depends very much on what you are trying to do [1]. Jetty sets no delay to true (that is, turns the algorithm off):

Phillips-MacBook-Air:jetty-all-8.1.9.v20130131 phenry$ grep -r setTcpNoDelay .
./org/eclipse/jetty/client/SelectConnector.java:            channel.socket().setTcpNoDelay(true);
./org/eclipse/jetty/client/SocketConnector.java:        socket.setTcpNoDelay(true);
./org/eclipse/jetty/server/AbstractConnector.java:            socket.setTcpNoDelay(true);
./org/eclipse/jetty/server/handler/ConnectHandler.java:            channel.socket().setTcpNoDelay(true);
./org/eclipse/jetty/websocket/WebSocketClient.java:        channel.socket().setTcpNoDelay(true);

Playing around with my own simple server, I experimented with this value. I set up a single thread on a 16-core Linux box using Java NIO that services requests sent from 2 MacBooks that each had 100 threads using normal, blocking IO and sending 10 010 bytes of data (the server replies with a mere 2 bytes).

Setting the algorithm on or off on the server made no discernible difference. Not surprising as 2 bytes are (probably) going to travel in the same packet. But calling socket.setTcpNoDelay(false) on the clients showed a marked improvement. Using a MacBook Pro (2.66GHz, Intel Core 2 Duo) as the client, the results looked like:

socket.setTcpNoDelay(true)

Mean calls/second:      3884
Standard Deviation:     283
Average call time (ms): 29

socket.setTcpNoDelay(false)

Mean calls/second:      5060
Standard Deviation:     75
Average call time (ms): 20

Your mileage my vary.

The big difference was the time it took to call SocketChannel.connect(...). This dropped from 20 to 13 ms.

As an aside, you can see Linux's network buffers filling up with something like:

[henryp@corsair ~]$ cat /proc/net/tcp | grep -i 22b8 # where 22b8 is port 8888 on which I am listening
   2: 5E01A8C0:22B8 00000000:0000 0A 00000000:00000012 02:0000000D 00000000  1000        0 1786995 2 ffff880f999f4600 99 0 0 10 -1                   
   3: 5E01A8C0:22B8 4101A8C0:F475 03 00000000:00000000 01:00000062 00000000  1000        0 0 2 ffff880f8b4fce80                                      
   4: 5E01A8C0:22B8 4101A8C0:F476 03 00000000:00000000 01:00000062 00000000  1000        0 0 2 ffff880f8b4fcf00 
.
.
  76: 5E01A8C0:22B8 5B01A8C0:D3A5 01 00000000:0000171A 00:00000000 00000000  1000        0 4035262 1 ffff880fc3ff8700 20 3 12 10 -1                  
  77: 5E01A8C0:22B8 5B01A8C0:D3AD 01 00000000:0000271A 00:00000000 00000000     0        0 0 1 ffff880fc3ffb100 20 3 12 10 -1                        
  78: 5E01A8C0:22B8 5B01A8C0:D3B4 01 00000000:00000800 00:00000000 00000000     0        0 0 1 ffff880e1216bf00 20 3 12 10 -1                        
  79: 5E01A8C0:22B8 5B01A8C0:D3A8 01 00000000:0000271A 00:00000000 00000000     0        0 0 1 ffff880fc3ff9500 20 3 12 10 -1                        
  80: 5E01A8C0:22B8 5B01A8C0:D3AC 01 00000000:0000271A 00:00000000 00000000     0        0 0 1 ffff880fc3ffe200 20 3 12 10 -1                        
  81: 5E01A8C0:22B8 5B01A8C0:D3B3 01 00000000:0000271A 00:00000000 00000000     0        0 0 1 ffff880e12169c00 20 3 12 10 -1                        
  82: 5E01A8C0:22B8 4101A8C0:F118 01 00000000:00000000 00:00000000 00000000  1000        0 4033066 1 ffff880e1216d400 20 3 0 10 -1 

Note 271A is 10010 - the size of our payload.

[1] ExtraHop blog.

Sunday, November 10, 2013

A Comparison of Simple Locking Strategies

I've been playing with locks again and made a comparison between 5 naive implementations. Putting aside arguments about how valuable microbenchmarks are, the results are interesting.

In each case, there were as many threads as CPUs on my machine (a 16 core Linux box) and all each thread wants to do is attain the lock, increment a counter until it reaches a point and release the lock.

Each test was run twice with the first result thrown away to allow the JVM to start up. There was also a 4 second pause before the run to avoid biased locking issues. Averages were taken over 10 runs of 2 000 000 iterations.

These are the five strategies:

SynchronizedIncrementer

Perhaps the simplest implementation is:

            synchronized (this) {
                counter++;
            }

a pessimistic lock approach.

AtomicReferenceIncrementer

Next is an optimistic lock approach with atomics:

            boolean set = false;
            while (!set) {
                Integer expected = counter.get();
                set = counter.compareAndSet(expected, new Integer(expected + 1));
            }

LockingIncrementer

Next is using Java's concurrent classes:

            lock.lock();
            try {
                counter++;
            } finally {
                lock.unlock();
            }

SpinLock

The next spins until a flag allows it to proceed.

            while (!atomicBoolean.compareAndSet(false, true)) {
                Thread.yield();
            }
            
            counter++;
            int myCounter = counter;
            
            if (!atomicBoolean.compareAndSet(true, false)) {
                throw new IllegalStateException();
            }

AtomicReferenceIncrementer

This uses an AtomicReference that holds an Integer and is otherwise similar to the AtomicIncrementer.

The Results

There isn't a huge amount between the strategies in this particular test:

                              Mean (ms) Standard Deviation
                              ========  ==================
AtomicIncrementer             2170      149.626535
SynchronizedIncrementer       2475      53.230630 
AtomicReferenceIncrementer    3319      275.069809
SpinLock                      3519      713.212030
LockingIncrementer            3690      244.545292

On my hardware at least, the optimistic AtomicInteger approach is fastest with the synchronized block offering the most predictable performance. However, there is not much between them.

The interesting thing is if you run the same test with just one thread, typically the results look like this:

Time took 7ms  for [SynchronizedIncrementer, counter = 2000000]
Time took 20ms for [AtomicIncrementer, counter = 2000000]
Time took 21ms for [AtomicReferenceIncrementer, counter = 2000000]
Time took 23ms for [SpinLock, counter = 2000000]
Time took 29ms for [LockingIncrementer, counter = 2000000]

Much faster and it's doing exactly the same amount of work!

Conclusion

Avoid multi-threading if you can help it. Use it only when it demonstrably speeds things up. Even then, try to architect your system so there is no contention in the first place.

Saturday, November 9, 2013

Journeys in Networks

After stress testing my pet project, JStringServer, I initially thought I'd made a big boo-boo as the performance was appalling. However, it turned out that my home router was not up to the job. So, on a friend's recommendation, I bought a TP-Link 5-Port Gigbait Desktop Switch. Performance was better but not that great. A quick Google showed I needed Cat 6 cables to make full use of it. D'oh.

OK, so after a few trips to my local electronics store, I set up a newish 1.8GHz i7 Mac Book Air (with a USB network adaptor) and an old 2009 Mac Book Pro trying to hammer my 16 core Linux desktop running my Java NIO server code.

The strategy JStringServer was using was one-thread-does-everything (code here). That is, it listens to the selector associated with the server socket, associates any clients who have connected with a second selector dedicated to clients, checks this second selector for any activity and services them. Although htop shows this thread to be very busy, the rest of the system was doing next to nothing.

The clients were averaging about 6 000 calls per second between them. Now, with a back-of-a-beer-mat calculation,  a payload of about 10 000 bytes (ignoring the 2 bytes return from the server) and 6 000 calls per second, this means the network was taking something like 480 gigabits/second (10 000 * 6 000 * 8 / 1 000 000). Not bad, but why not better?

TcpDump

Since JStringServer is currently using just TCP, it turns out that there is a lot of overhead on the network acknowledging the packets the client is sending the server.

If we run tcpdump and capture its output thus:

$ sudo tcpdump -nn host 192.168.1.94 and 192.168.1.65 -i p7p1 > tcpdump_jstringserver_2machines_normalUse.txt 

we see as many packets are going to the server (192.168.1.94) as the other way:

$ grep -c "192.168.1.94.8888 >" tcpdump_jstringserver_2machines_normalUse.txt 
1996027
$ grep -c "> 192.168.1.94.8888" tcpdump_jstringserver_2machines_normalUse.txt 
2005298

So, the figure of 480 gigabits/second seems to be as good as we're going to get on this particular hardware using TCP (2 * 480 ~ 1 gigabit limit).

The return packets that carry the acknowledgement can also carry data [1]. There show up in tcpdump as [P.] where P stands for a PUSH of data and '.' represents an acknowledgement [2]. But since in this particular example, our server replies with very terse responses compared to very verbose requests, this doesn't save us much. A lot of packets are wasted just acknowledging:

$ grep -c -P "192.168.1.94.8888 \>.* \[\.\], ack \\d\\d" tcpdump_jstringserver_2machines_normalUse.txt 
1427585

That's about 70% of all traffic from the server to the client.

Another problem with TCP is the handshake uses a lot of packets (as a percentage of the total package used in a connection).

For SYN:

$ grep -c " \[S\]" tcpdump_jstringserver_2machines_normalUse.txt 
120675

For SYN-ACK

$ grep -c " \[S\.\]" tcpdump_jstringserver_2machines_normalUse.txt 
118371

and for ACK (handshake only):

$ grep -c -P "\[\.\], ack 1," tcpdump_jstringserver_2machines_normalUse.txt 
113403

That totals 17% of the total traffic. In this particular example, this connection pooling would solve this. 

[1] Does tcp send a syn-ack on every packet or only on the first connection StackOverflow.
[2] tcpdump man pages


Sunday, November 3, 2013

The other side of the Fence

Memory Barriers and Memory Fences are often used in the literature of the Java Memory Model, but what exactly are they?

Firstly, they are synonyms:

"To prevent the reordering of operations resulting from write buffering, modern architectures provide a special memory barrier instruction (sometimes called a memory fence) that forces outstanding operations to take effect. It is the programmers responsibility to know where to insert a memory barrier.... Not surprisingly, memory barriers are expensive, about as expensive as an atomic compareAndSet() instruction... In fact, synchronization instructions such as getAndSet() or compareAndSet() described in earlier chapters include a memory barrier on many architectures, as do reads and writes to volatile fields." [1]

It's interesting that compareAndSet is regarded as slow since most Java programmers I know seem to think they are more efficient (although this is not born out on the hardware I've been playing with where it appears to be comparable to using synchronized blocks). This could be why the Java allows you to change these values without incurring the ordering costs (see weakCompareAndSet and lazySet at the API JavaDocs).

Secondly, not all memory barriers are the same. Doug Lea categorises them [2] and says:

"A property of memory barriers that takes some getting used to is that they apply BETWEEN memory accesses. Despite the names given for barrier instructions on some processors, the right/best barrier to use depends on the kinds of accesses it separates. [The Java abstractions] map pretty well to specific instructions (sometimes no-ops) on existing processors:"

Of course, one way to massively improve performance is not to contend for a shared value in the first place. Marc Brooker presents some nice evidence [3] where parallelizing code massively slows it down because of all the contention. He also gives a good demonstration of interpreting the sometimes esoteric results from perf stat. This might not come as a surprise to readers of Martin Thompson's blog [4] where he advocates the Single Writer Principle.

Further Reading

The Fences class JavaDocs (documentation only, not planned for any release).

Doug Lea chatting about the JMM and what falls outside of it here.

[1] The Art of Multiprocessor Programming.

[2] Doug Lea's The JSR-133 Cookbook for Compiler Writers.

[3] Marc Brooker's blog.

[4] Martin Thompson's blog.

Saturday, November 2, 2013

Grid Testing in a single JVM!

This week, I have been playing with a great, free, open source project called LittleGrid. You can run a whole cluster in one JVM, stopping and starting members with a single method call to emulate failover. This makes running tests as part of a continuous build process very easy and very nice.

It does all this cleverness by having a different class loader for each instance. This can cause some confusion when you see messages that basically say: ClassCastException: cannot cast class YourClass to class YourClass. Huh? Well, of course, a class is defined by its class loader not just its fully qualified name.

You can get around this by introspectively instantiating a helper class using a cluster member's class loader. This is how we configured a mock Spring framework for all the cluster members.

Since I am relatively new to Coherence, it was gratifying to sanity check some of its features. For instance, in Coherence you can add a map entry using a normal put:

import com.tangosol.net.NamedCache; 
.
.
.
    NamedCache cache = CacheFactory.getCache(CACHE_NAME);
    cache.put(key, value);

Or you could add something by invoking an Entry Processor (Coherence's equivalent of a databases Stored Procedure):

    EntryProcessor entryProcessor = new MyEntryProcessor(key, value); 
    Object         returned       = cache.invoke(key, entryProcessor); 

where my entry processor looks something like this:

class MyEntryProcessor implements Serializable, EntryProcessor {
.
.
.
    public Object process(Entry entry) { 
        BackingMapManagerContext    context     = getContext(entry); 
        Map         myCache     = context.getBackingMap(CACHE_NAME); 
        Binary                      binaryKey   = (Binary) context.getKeyToInternalConverter().convert(myKey); 
        Binary                      binaryValue = (Binary) context.getValueToInternalConverter().convert(myValue); 
        myCache.put(binaryKey, binaryValue); 
        return null;
    }

    protected BackingMapManagerContext getContext(Entry entry) {
        BinaryEntry                 binaryEntry = (BinaryEntry) entry;
        BackingMapManagerContext    context     = binaryEntry.getContext();
        return context;
    }
.
.
.

By judicious use of breakpoints, I can show that the thread that executes the entry processor blocks the put method call.

This is important in our project as we have code that extends the com.tangosol.net.cache.LocalCache and overrides the put method to do some magic sauce. This is a bit nasty as it's not a good separation of concerns and we're looking at refactoring it out. But there was a concern that the two threads may introduce a race condition. Thankfully, it appears it cannot.

[A cleaner design might have been to use listeners on the cache but in the early days of us using Coherence, the team didn't know which threads executed these listeners.

"A backing map listener ... is nothing more than a class that implements the MapListener interface. [T]hey are executed on the cache service thread which imposes a certain set of requirements and limitations on them.

"For one, just like entry processors, backing map listeners are not allowed to make a re-entrant call into the cache service that they are part of. That means that you cannot access from them any cache that belongs to the same cache service.

"Second, because they are executed synchronously on a cache service thread, it is of paramount importance that you do not do anything time consuming within the even handler... If you need to do anything that might take longer, you need to delegate it to Invocation Service, Work Manager or an external system.

"Finally, because backing map listeners are essentially the same mechanism that is used internally for backup of cache entries, the MapEvent instance they receive are not quite what you would expect and calls to getKey, getOldValue and getNewValue will return values in internal, serialized binary format."

- From Oracle Coherence 3.5].

Testing failover is much easier in LittleGrid:

int memberId = ...
ClusterMember clusterMember = memberGroup.getClusterMember(memberId);
clusterMember.shutdown();

which also gives us an opportunity to see data jumping from the backup store and into the LocalCache. By break pointing the overriden put method, you can see that this is how the data that the node was backing up adds it to its cache.

One last note: I'm currently working in the investment banking and we have the resources to pay for Coherence Enterprise edition. However, we're quite happy with the free version and have been getting good performance out of it. As a result, the tests we're running in our Continuous Integration environment are pretty much representative of what we can see in prod.