Skip to content

Commit

Permalink
Merge pull request #797 from mattrjacobs/take-histogram-off-multithre…
Browse files Browse the repository at this point in the history
…aded-read-path

Move all reads of Histograms to a single-threaded path
  • Loading branch information
mattrjacobs committed May 21, 2015
2 parents 3246e46 + d66dae4 commit cb72b94
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 23 deletions.
2 changes: 1 addition & 1 deletion hystrix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jar {
jmh {
fork = 10
iterations = 3
jmhVersion = '1.9'
jmhVersion = '1.9.3'
profilers = ['gc']
threads = 8
warmup = '1s'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -72,7 +73,8 @@ public Integer writeHeavyCommandExecution(CommandState state) {
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer writeHeavyReadMetrics(CommandState state) {
return state.command.getMetrics().getCurrentConcurrentExecutionCount();
HystrixCommandMetrics metrics = state.command.getMetrics();
return metrics.getExecutionTimeMean() + metrics.getExecutionTimePercentile(50) + metrics.getExecutionTimePercentile(75) + metrics.getExecutionTimePercentile(99);
}

@Benchmark
Expand All @@ -90,7 +92,8 @@ public Integer evenSplitOfWritesAndReadsCommandExecution(CommandState state) {
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer evenSplitOfWritesAndReadsReadMetrics(CommandState state) {
return state.command.getMetrics().getCurrentConcurrentExecutionCount();
HystrixCommandMetrics metrics = state.command.getMetrics();
return metrics.getExecutionTimeMean() + metrics.getExecutionTimePercentile(50) + metrics.getExecutionTimePercentile(75) + metrics.getExecutionTimePercentile(99);
}

@Benchmark
Expand All @@ -108,6 +111,7 @@ public Integer readHeavyCommandExecution(CommandState state) {
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer readHeavyReadMetrics(CommandState state) {
return state.command.getMetrics().getCurrentConcurrentExecutionCount();
HystrixCommandMetrics metrics = state.command.getMetrics();
return metrics.getExecutionTimeMean() + metrics.getExecutionTimePercentile(50) + metrics.getExecutionTimePercentile(75) + metrics.getExecutionTimePercentile(99);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void addValue(int... value) {

for (int v : value) {
try {
getCurrentBucket().data.addValue(v);
getCurrentBucket().bucketData.addValue(v);
} catch (Exception e) {
logger.error("Failed to add value: " + v, e);
}
Expand Down Expand Up @@ -285,7 +285,7 @@ private static class PercentileBucketData {
private final IntCountsHistogram histogram;

public PercentileBucketData() {
this.histogram = new IntCountsHistogram(3);
this.histogram = new IntCountsHistogram(4);
}

public void addValue(int... latency) {
Expand All @@ -304,36 +304,83 @@ public int length() {
*/
/* package for testing */ static class PercentileSnapshot {
private final IntCountsHistogram aggregateHistogram;
private final long count;
private final int mean;
private final int p0;
private final int p5;
private final int p10;
private final int p25;
private final int p50;
private final int p75;
private final int p90;
private final int p95;
private final int p99;
private final int p995;
private final int p999;
private final int p100;


/* package for testing */ PercentileSnapshot() {
this(new Bucket[0]);
}

/* package for testing */ PercentileSnapshot(int... data) {
aggregateHistogram = new IntCountsHistogram(4);
for (int latency: data) {
aggregateHistogram.recordValue(latency);
}
/* package for testing */ PercentileSnapshot(long startTime, int... data) {
this(new Bucket[]{new Bucket(startTime, data)});
}

/* package for testing */ PercentileSnapshot(Bucket[] buckets) {
aggregateHistogram = new IntCountsHistogram(4);
for (Bucket bucket: buckets) {
aggregateHistogram.add(bucket.data.histogram);
aggregateHistogram.add(bucket.bucketData.histogram);
}

count = aggregateHistogram.getTotalCount();
mean = (int) aggregateHistogram.getMean();
p0 = (int) aggregateHistogram.getValueAtPercentile(0);
p5 = (int) aggregateHistogram.getValueAtPercentile(5);
p10 = (int) aggregateHistogram.getValueAtPercentile(10);
p25 = (int) aggregateHistogram.getValueAtPercentile(25);
p50 = (int) aggregateHistogram.getValueAtPercentile(50);
p75 = (int) aggregateHistogram.getValueAtPercentile(75);
p90 = (int) aggregateHistogram.getValueAtPercentile(90);
p95 = (int) aggregateHistogram.getValueAtPercentile(95);
p99 = (int) aggregateHistogram.getValueAtPercentile(99);
p995 = (int) aggregateHistogram.getValueAtPercentile(99.5);
p999 = (int) aggregateHistogram.getValueAtPercentile(99.9);
p100 = (int) aggregateHistogram.getValueAtPercentile(100);
}

/* package for testing */ int getMean() {
return (int) aggregateHistogram.getMean();
return mean;
}

/**
* Provides percentile computation.
*/
public int getPercentile(double percentile) {
if (aggregateHistogram.getTotalCount() == 0) {
if (count == 0) {
return 0;
}

int permyriad = (int) (percentile * 100);
switch(permyriad) {
case 0 : return p0;
case 500 : return p5;
case 1000: return p10;
case 2500: return p25;
case 5000: return p50;
case 7500: return p75;
case 9000: return p90;
case 9500: return p95;
case 9900: return p99;
case 9950: return p995;
case 9990: return p999;
case 10000: return p100;
default: return getArbitraryPercentile(percentile);
}
}

private synchronized int getArbitraryPercentile(double percentile) {
return (int) aggregateHistogram.getValueAtPercentile(percentile);
}
}
Expand Down Expand Up @@ -519,13 +566,19 @@ private Bucket[] getArray() {
*/
/* package for testing */ static class Bucket {
final long windowStart;
final PercentileBucketData data;
final PercentileBucketData bucketData;

Bucket(long startTime) {
this.windowStart = startTime;
this.data = new PercentileBucketData();
this.bucketData = new PercentileBucketData();
}

public Bucket(long startTime, int[] data) {
this.windowStart = startTime;

this.bucketData = new PercentileBucketData();
bucketData.addValue(data);
}
}

/* package for testing */ static interface Time {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import com.netflix.hystrix.strategy.properties.HystrixProperty;
Expand All @@ -33,6 +41,23 @@ public class HystrixRollingPercentileTest {
private static final HystrixProperty<Integer> numberOfBuckets = HystrixProperty.Factory.asProperty(12); // 12 buckets at 5000ms each
private static final HystrixProperty<Boolean> enabled = HystrixProperty.Factory.asProperty(true);

private static ExecutorService threadPool;

@BeforeClass
public static void setUp() {
threadPool = Executors.newFixedThreadPool(10);
}

@AfterClass
public static void tearDown() {
threadPool.shutdown();
try {
threadPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
System.out.println("Thread pool never terminated in HystrixRollingPercentileTest");
}
}

@Test
public void testRolling() {
MockedTime time = new MockedTime();
Expand Down Expand Up @@ -78,7 +103,7 @@ public void testRolling() {
time.increment(6000);

// the rolling version should have the same data as creating a snapshot like this
PercentileSnapshot ps = new PercentileSnapshot(1000, 1000, 1000, 2000, 1000, 500, 200, 200, 1600, 200, 1600, 1600);
PercentileSnapshot ps = new PercentileSnapshot(System.currentTimeMillis(), 1000, 1000, 1000, 2000, 1000, 500, 200, 200, 1600, 200, 1600, 1600);

assertEquals(ps.getPercentile(0.15), p.getPercentile(0.15));
assertEquals(ps.getPercentile(0.50), p.getPercentile(0.50));
Expand Down Expand Up @@ -201,36 +226,36 @@ public void testSampleDataOverTime2() {
}

public PercentileSnapshot getPercentileForValues(int... values) {
return new PercentileSnapshot(values);
return new PercentileSnapshot(System.currentTimeMillis(), values);
}

@Test
public void testPercentileAlgorithm_Median1() {
PercentileSnapshot list = new PercentileSnapshot(100, 100, 100, 100, 200, 200, 200, 300, 300, 300, 300);
PercentileSnapshot list = new PercentileSnapshot(System.currentTimeMillis(), 100, 100, 100, 100, 200, 200, 200, 300, 300, 300, 300);
Assert.assertEquals(200, list.getPercentile(50));
}

@Test
public void testPercentileAlgorithm_Median2() {
PercentileSnapshot list = new PercentileSnapshot(100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 500);
PercentileSnapshot list = new PercentileSnapshot(System.currentTimeMillis(), 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 500);
Assert.assertEquals(100, list.getPercentile(50));
}

@Test
public void testPercentileAlgorithm_Median3() {
PercentileSnapshot list = new PercentileSnapshot(50, 75, 100, 125, 160, 170, 180, 200, 210, 300, 500);
PercentileSnapshot list = new PercentileSnapshot(System.currentTimeMillis(), 50, 75, 100, 125, 160, 170, 180, 200, 210, 300, 500);
Assert.assertEquals(170, list.getPercentile(50));
}

@Test
public void testPercentileAlgorithm_Median4() {
PercentileSnapshot list = new PercentileSnapshot(300, 75, 125, 500, 100, 160, 180, 200, 210, 50, 170);
PercentileSnapshot list = new PercentileSnapshot(System.currentTimeMillis(), 300, 75, 125, 500, 100, 160, 180, 200, 210, 50, 170);
Assert.assertEquals(170, list.getPercentile(50));
}

@Test
public void testPercentileAlgorithm_Extremes() {
PercentileSnapshot p = new PercentileSnapshot(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 800, 768, 657, 700, 867);
PercentileSnapshot p = new PercentileSnapshot(System.currentTimeMillis(), 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 800, 768, 657, 700, 867);

System.out.println("0.01: " + p.getPercentile(0.01));
System.out.println("10th: " + p.getPercentile(10));
Expand Down Expand Up @@ -317,6 +342,65 @@ public void increment(int millis) {

}

@Test
public void testThreadSafety() {
final MockedTime time = new MockedTime();
final HystrixRollingPercentile p = new HystrixRollingPercentile(time, HystrixProperty.Factory.asProperty(100), HystrixProperty.Factory.asProperty(25), HystrixProperty.Factory.asProperty(true));

final int NUM_THREADS = 1000;
final int NUM_ITERATIONS = 1000000;

final CountDownLatch latch = new CountDownLatch(NUM_THREADS);

final AtomicInteger aggregateMetrics = new AtomicInteger(); //same as a blackhole

final Random r = new Random();

Future<?> metricsPoller = threadPool.submit(new Runnable() {
@Override
public void run() {
while(!Thread.currentThread().isInterrupted()) {
aggregateMetrics.addAndGet(p.getMean() + p.getPercentile(10) + p.getPercentile(50) + p.getPercentile(90));
//System.out.println("AGGREGATE : " + p.getPercentile(10) + " : " + p.getPercentile(50) + " : " + p.getPercentile(90));
}
}
});

for (int i = 0; i < NUM_THREADS; i++) {
final int threadId = i;
threadPool.submit(new Runnable() {
@Override
public void run() {
for (int j = 1; j < NUM_ITERATIONS / NUM_THREADS + 1; j++) {
int nextInt = r.nextInt(100);
p.addValue(nextInt);
if (threadId == 0) {
time.increment(1);
}
}
latch.countDown();
}
});
}

try {
latch.await(100, TimeUnit.SECONDS);
metricsPoller.cancel(true);
} catch (InterruptedException ex) {
fail("Timeout on all threads writing percentiles");
}

aggregateMetrics.addAndGet(p.getMean() + p.getPercentile(10) + p.getPercentile(50) + p.getPercentile(90));
System.out.println(p.getMean() + " : " + p.getPercentile(50) + " : " + p.getPercentile(75) + " : " + p.getPercentile(90) + " : " + p.getPercentile(95) + " : " + p.getPercentile(99));
}

@Test
public void testThreadSafetyMulti() {
for (int i = 0; i < 100; i++) {
testThreadSafety();
}
}

/* sub-class to avoid 65k limit of a single class */
private static class SampleDataHolder1 {
/*
Expand Down

0 comments on commit cb72b94

Please sign in to comment.