diff --git a/java/KinesisAggregator/assembly.xml b/java/KinesisAggregator/assembly.xml new file mode 100644 index 0000000..e26a927 --- /dev/null +++ b/java/KinesisAggregator/assembly.xml @@ -0,0 +1,16 @@ + + complete + + jar + + false + + + / + true + runtime + + + \ No newline at end of file diff --git a/java/KinesisAggregator/dist/KinesisAggregator-1.0.4-complete.jar b/java/KinesisAggregator/dist/KinesisAggregator-1.0.4-complete.jar new file mode 100644 index 0000000..806d35d Binary files /dev/null and b/java/KinesisAggregator/dist/KinesisAggregator-1.0.4-complete.jar differ diff --git a/java/KinesisAggregator/dist/amazon-kinesis-aggregator-1.0.4-javadoc.jar b/java/KinesisAggregator/dist/amazon-kinesis-aggregator-1.0.4-javadoc.jar new file mode 100644 index 0000000..351c3ad Binary files /dev/null and b/java/KinesisAggregator/dist/amazon-kinesis-aggregator-1.0.4-javadoc.jar differ diff --git a/java/KinesisAggregator/dist/amazon-kinesis-aggregator-1.0.4-sources.jar b/java/KinesisAggregator/dist/amazon-kinesis-aggregator-1.0.4-sources.jar new file mode 100644 index 0000000..c09aa0d Binary files /dev/null and b/java/KinesisAggregator/dist/amazon-kinesis-aggregator-1.0.4-sources.jar differ diff --git a/java/KinesisAggregator/dist/amazon-kinesis-aggregator-1.0.4.jar b/java/KinesisAggregator/dist/amazon-kinesis-aggregator-1.0.4.jar new file mode 100644 index 0000000..eae2aca Binary files /dev/null and b/java/KinesisAggregator/dist/amazon-kinesis-aggregator-1.0.4.jar differ diff --git a/java/KinesisAggregator/pom.xml b/java/KinesisAggregator/pom.xml index 3594e35..85940f1 100644 --- a/java/KinesisAggregator/pom.xml +++ b/java/KinesisAggregator/pom.xml @@ -1,4 +1,5 @@ - 4.0.0 @@ -7,8 +8,8 @@ com.amazonaws amazon-kinesis-aggregator - 1.0.3 - + 1.0.4 + jar @@ -21,7 +22,7 @@ scm:git:git://github.com/awslabs/kinesis-aggregation.git https://github.com/awslabs/kinesis-aggregation - + Amazon Software License @@ -30,16 +31,16 @@ - - - amazonwebservices - Amazon Web Services - https://aws.amazon.com - - developer - - - + + + amazonwebservices + Amazon Web Services + https://aws.amazon.com + + developer + + + clean compile @@ -86,24 +87,34 @@ - org.apache.maven.plugins - maven-antrun-plugin - 1.8 - - - copy - - - - - - package + maven-assembly-plugin + + KinesisAggregator-${version} + + assembly.xml + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.8 + + + copy + + + + + + package - run - - - - + run + + + + @@ -125,6 +136,12 @@ jackson-databind 2.8.11.1 + + org.apache.commons + commons-lang3 + 3.8.1 + test + junit junit diff --git a/java/KinesisAggregator/src/main/java/com/amazonaws/kinesis/agg/AggRecord.java b/java/KinesisAggregator/src/main/java/com/amazonaws/kinesis/agg/AggRecord.java index ddd8995..fde5264 100644 --- a/java/KinesisAggregator/src/main/java/com/amazonaws/kinesis/agg/AggRecord.java +++ b/java/KinesisAggregator/src/main/java/com/amazonaws/kinesis/agg/AggRecord.java @@ -46,7 +46,8 @@ * * This class is NOT thread-safe. * - * @see https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md + * @see https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md */ @NotThreadSafe public class AggRecord { @@ -107,8 +108,8 @@ public AggRecord() { * Get the current number of user records contained inside this aggregate * record. * - * @return The current number of user records added via the - * "addUserRecord(...)" method. + * @return The current number of user records added via the "addUserRecord(...)" + * method. */ public int getNumUserRecords() { return this.aggregatedRecordBuilder.getRecordsCount(); @@ -128,8 +129,8 @@ public int getSizeBytes() { } /** - * Serialize this record to bytes. Has no side effects (i.e. does not affect - * the contents of this record object). + * Serialize this record to bytes. Has no side effects (i.e. does not affect the + * contents of this record object). * * @return A byte array containing an Kinesis aggregated format-compatible * Kinesis record. @@ -157,8 +158,8 @@ public byte[] toRecordBytes() { } /** - * Clears out all records and metadata from this object so that it can be - * reused just like a fresh instance of this object. + * Clears out all records and metadata from this object so that it can be reused + * just like a fresh instance of this object. */ public void clear() { this.md5.reset(); @@ -173,8 +174,8 @@ public void clear() { /** * Get the overarching partition key for the entire aggregated record. * - * @return The partition key to use for the aggregated record or null if - * this aggregated record is empty. + * @return The partition key to use for the aggregated record or null if this + * aggregated record is empty. */ public String getPartitionKey() { if (getNumUserRecords() == 0) { @@ -199,11 +200,11 @@ public String getExplicitHashKey() { } /** - * Based on the current size of this aggregated record, calculate what the - * new size would be if we added another user record with the specified - * parameters (used to determine when this aggregated record is full and - * can't accept any more user records). This calculation is highly dependent - * on the Kinesis aggregated message format. + * Based on the current size of this aggregated record, calculate what the new + * size would be if we added another user record with the specified parameters + * (used to determine when this aggregated record is full and can't accept any + * more user records). This calculation is highly dependent on the Kinesis + * aggregated message format. * * @param partitionKey * The partition key of the new record to simulate adding @@ -211,8 +212,8 @@ public String getExplicitHashKey() { * The explicit hash key of the new record to simulate adding * @param data * The raw data of the new record to simulate adding - * @return The new size of this existing record in bytes if a new user - * record with the specified parameters was added. + * @return The new size of this existing record in bytes if a new user record + * with the specified parameters was added. * @see https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md */ private int calculateRecordSize(String partitionKey, String explicitHashKey, byte[] data) { @@ -220,7 +221,7 @@ private int calculateRecordSize(String partitionKey, String explicitHashKey, byt // has the partition key been added to the table of known PKs yet? if (!this.partitionKeys.contains(partitionKey)) { - int pkLength = partitionKey.length(); + int pkLength = partitionKey.getBytes().length; messageSize += 1; // (message index + wire type for PK table) messageSize += calculateVarintSize(pkLength); // size of pk length // value @@ -229,10 +230,9 @@ private int calculateRecordSize(String partitionKey, String explicitHashKey, byt // has the explicit hash key been added to the table of known EHKs yet? if (!this.explicitHashKeys.contains(explicitHashKey)) { - int ehkLength = explicitHashKey.length(); + int ehkLength = explicitHashKey.getBytes().length; messageSize += 1; // (message index + wire type for EHK table) - messageSize += calculateVarintSize( - ehkLength); /* size of ehk length value */ + messageSize += calculateVarintSize(ehkLength); /* size of ehk length value */ messageSize += ehkLength; // actual ehk length } @@ -243,34 +243,32 @@ private int calculateRecordSize(String partitionKey, String explicitHashKey, byt // partition key field innerRecordSize += 1; // (message index + wire type for PK index) - innerRecordSize += calculateVarintSize(this.partitionKeys - .getPotentialIndex(partitionKey)); /* size of pk index value */ + innerRecordSize += calculateVarintSize( + this.partitionKeys.getPotentialIndex(partitionKey)); /* size of pk index value */ // explicit hash key field (this is optional) if (explicitHashKey != null) { innerRecordSize += 1; // (message index + wire type for EHK index) - innerRecordSize += calculateVarintSize(this.explicitHashKeys.getPotentialIndex( - explicitHashKey)); /* size of ehk index value */ + innerRecordSize += calculateVarintSize( + this.explicitHashKeys.getPotentialIndex(explicitHashKey)); /* size of ehk index value */ } // data field innerRecordSize += 1; // (message index + wire type for record data) - innerRecordSize += calculateVarintSize( - data.length); /* size of data length value */ + innerRecordSize += calculateVarintSize(data.length); /* size of data length value */ innerRecordSize += data.length; // actual data length messageSize += 1; // (message index + wire type for record) - messageSize += calculateVarintSize( - innerRecordSize); /* size of entire record length value */ + messageSize += calculateVarintSize(innerRecordSize); /* size of entire record length value */ messageSize += innerRecordSize; // actual entire record length return messageSize; } /** - * For an integral value represented by a varint, calculate how many bytes - * are necessary to represent the value in a protobuf message. + * For an integral value represented by a varint, calculate how many bytes are + * necessary to represent the value in a protobuf message. * * @param value * The value whose varint size will be calculated @@ -306,8 +304,8 @@ private int calculateVarintSize(long value) { } /** - * Add a new user record to this existing aggregated record if there is - * enough space (based on the defined Kinesis limits for a PutRecord call). + * Add a new user record to this existing aggregated record if there is enough + * space (based on the defined Kinesis limits for a PutRecord call). * * @param partitionKey * The partition key of the new user record to add @@ -315,8 +313,8 @@ private int calculateVarintSize(long value) { * The explicit hash key of the new user record to add * @param data * The raw data of the new user record to add - * @return True if the new user record was successfully added to this - * aggregated record or false if this aggregated record is too full. + * @return True if the new user record was successfully added to this aggregated + * record or false if this aggregated record is too full. */ public boolean addUserRecord(String partitionKey, String explicitHashKey, byte[] data) { // set the explicit hash key for the message to the partition key - @@ -369,15 +367,14 @@ public boolean addUserRecord(String partitionKey, String explicitHashKey, byte[] } /** - * Convert the aggregated data in this record into a single - * PutRecordRequest. This method has no side effects (i.e. it will not clear - * the current contents of the aggregated record). + * Convert the aggregated data in this record into a single PutRecordRequest. + * This method has no side effects (i.e. it will not clear the current contents + * of the aggregated record). * * @param streamName - * The Kinesis stream name where this PutRecordRequest will be - * sent. - * @return A PutRecordRequest containing all the current data in this - * aggregated record. + * The Kinesis stream name where this PutRecordRequest will be sent. + * @return A PutRecordRequest containing all the current data in this aggregated + * record. */ public PutRecordRequest toPutRecordRequest(String streamName) { byte[] recordBytes = toRecordBytes(); @@ -426,10 +423,11 @@ private void validatePartitionKey(final String partitionKey) { throw new IllegalArgumentException("Partition key cannot be null"); } - if (partitionKey.length() < PARTITION_KEY_MIN_LENGTH || partitionKey.length() > PARTITION_KEY_MAX_LENGTH) { + if (partitionKey.getBytes().length < PARTITION_KEY_MIN_LENGTH + || partitionKey.getBytes().length > PARTITION_KEY_MAX_LENGTH) { throw new IllegalArgumentException( "Invalid partition key. Length must be at least " + PARTITION_KEY_MIN_LENGTH + " and at most " - + PARTITION_KEY_MAX_LENGTH + ", got length of " + partitionKey.length()); + + PARTITION_KEY_MAX_LENGTH + ", got length of " + partitionKey.getBytes().length); } try { @@ -464,13 +462,13 @@ private void validateExplicitHashKey(final String explicitHashKey) { } /** - * Calculate a new explicit hash key based on the input partition key - * (following the algorithm from the original KPL). + * Calculate a new explicit hash key based on the input partition key (following + * the algorithm from the original KPL). * * @param partitionKey * The partition key to seed the new explicit hash key with - * @return An explicit hash key based on the input partition key generated - * using an algorithm from the original KPL. + * @return An explicit hash key based on the input partition key generated using + * an algorithm from the original KPL. */ private String createExplicitHashKey(final String partitionKey) { BigInteger hashKey = BigInteger.ZERO; @@ -492,8 +490,8 @@ private String createExplicitHashKey(final String partitionKey) { /** * A class for tracking unique partition keys or explicit hash keys for an - * aggregated Kinesis record. Also assists in keeping track of indexes for - * their locations in the protobuf tables. + * aggregated Kinesis record. Also assists in keeping track of indexes for their + * locations in the protobuf tables. */ private class KeySet { /** The list of unique keys in this keyset. */ @@ -510,14 +508,14 @@ public KeySet() { } /** - * If the input key were added to this KeySet, determine what its - * resulting index would be. + * If the input key were added to this KeySet, determine what its resulting + * index would be. * * @param s * The input string to potentially add to the KeySet * - * @return The table index that this string would occupy if it were - * added to this KeySet. + * @return The table index that this string would occupy if it were added to + * this KeySet. */ public Long getPotentialIndex(String s) { Long it = this.lookup.get(s); @@ -534,9 +532,9 @@ public Long getPotentialIndex(String s) { * @param s * The key to add to the keyset. * - * @return A pair of . The boolean is true if this key is - * not already in this keyset, false otherwise. The long - * indicates the index of the key. + * @return A pair of . The boolean is true if this key is not + * already in this keyset, false otherwise. The long indicates the index + * of the key. */ public ExistenceIndexPair add(String s) { Long it = this.lookup.get(s); @@ -569,8 +567,8 @@ public void clear() { }; /** - * A helper class for use with the KeySet that indicates whether or not a - * key exists in the KeySet and what its saved index would be. + * A helper class for use with the KeySet that indicates whether or not a key + * exists in the KeySet and what its saved index would be. */ private class ExistenceIndexPair { private Boolean first; diff --git a/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/MonteCarloTestAggregation.java b/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/MonteCarloTestAggregation.java index f32e2c3..c304694 100644 --- a/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/MonteCarloTestAggregation.java +++ b/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/MonteCarloTestAggregation.java @@ -16,6 +16,13 @@ */ package com.amazonaws.kinesis.agg; +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -28,6 +35,9 @@ import com.amazonaws.kinesis.agg.RecordAggregator.RecordCompleteListener; public class MonteCarloTestAggregation { + private volatile int successRecords = 0; + private volatile int failedRecords = 0; + private final class IncrementCountListener implements RecordCompleteListener { private Integer userRecordCount = 0; @@ -45,52 +55,90 @@ public Integer getCount() { } private class TestRunner implements Runnable { - public TestRunner(int instance, int tests) { + public TestRunner(int instance, int messagesToSend) { this.instance = instance; - this.maxTests = tests; + this.messagesToSend = messagesToSend; } private int instance; - private int maxTests; + private int messagesToSend; private IncrementCountListener listener = new IncrementCountListener(); private Integer recordsSuccess = 0; private Integer recordsFailed = 0; + private Integer recordRotations = 0; + private Charset charset = Charset.forName("UTF-8"); public int getRecordsSucess() { return this.recordsSuccess; } + public int getRecordsFailed() { + return this.recordsFailed; + } + public int getRecordsRotated() { return this.listener.getCount(); } + private String truncateToFitUtf8ByteLength(String s, int maxBytes) { + CharsetDecoder decoder = charset.newDecoder(); + if (s == null) { + return null; + } + byte[] sba = s.getBytes(charset); + if (sba.length <= maxBytes) { + return s; + } + // Ensure truncation by having byte buffer = maxBytes + ByteBuffer bb = ByteBuffer.wrap(sba, 0, maxBytes); + CharBuffer cb = CharBuffer.allocate(maxBytes); + // Ignore an incomplete character + decoder.onMalformedInput(CodingErrorAction.IGNORE); + decoder.decode(bb, cb, true); + decoder.flush(cb); + return new String(cb.array(), 0, cb.position()); + } + public void run() { Random r = new Random(); RecordAggregator agg = new RecordAggregator(); agg.onRecordComplete(listener); + boolean stop = false; - for (int i = 0; i < maxTests; i++) { - // generate a random partition key - int pkeySize = 0; - while (pkeySize == 0) { - pkeySize = r.nextInt(AggRecord.PARTITION_KEY_MAX_LENGTH); - } - byte[] blob = new byte[pkeySize]; - r.nextBytes(blob); - String randomPKey = new String(blob); - - // generate a random payload - int dataSize = r.nextInt(AggRecord.MAX_BYTES_PER_RECORD - AggRecord.AGGREGATION_OVERHEAD_BYTES); - byte[] randomData = new byte[dataSize]; - r.nextBytes(randomData); - - // fire the random data into the Aggregator + int i = 0; + while (i < messagesToSend && !stop) { try { - agg.addUserRecord(randomPKey, null, randomData); + i++; + // generate a random partition key of up to PARTITION_KEY_MAX_LENGTH characters + int nextLen = new Double(AggRecord.PARTITION_KEY_MAX_LENGTH * r.nextDouble()).intValue(); + byte[] randomPkB = new byte[nextLen]; + r.nextBytes(randomPkB); + String randomPKey = truncateToFitUtf8ByteLength(new String(randomPkB), + AggRecord.PARTITION_KEY_MAX_LENGTH); + if (randomPKey.length() == 0) { + randomPKey = new Integer(r.nextInt()).toString(); + } + + // generate a random payload + int targetDataSize = new Double((AggRecord.MAX_BYTES_PER_RECORD + - AggRecord.AGGREGATION_OVERHEAD_BYTES - randomPKey.getBytes().length) * r.nextDouble()) + .intValue(); + int dataSize = r.nextInt(targetDataSize == 0 ? 1 : targetDataSize); + byte[] randomData = new byte[dataSize]; + r.nextBytes(randomData); + + // fire the random data into the Aggregator + + AggRecord t = agg.addUserRecord(randomPKey, null, randomData); + if (t != null) { + this.recordRotations += 1; + } this.recordsSuccess += 1; } catch (Exception e) { e.printStackTrace(); this.recordsFailed += 1; + // stop the thread + stop = true; } } @@ -98,8 +146,8 @@ public void run() { AggRecord lastInFlight = agg.clearAndGet(); listener.recordComplete(lastInFlight); - System.out.println(String.format("%s: %s requested, %s success, %s failed", this.instance, this.maxTests, - this.recordsSuccess, this.recordsFailed)); + System.out.println(String.format("%s: %s requested, %s success, %s failed, %s rotations", this.instance, + this.messagesToSend, this.recordsSuccess, this.recordsFailed, this.recordRotations)); } } @@ -110,12 +158,12 @@ public void monteCarloTest() throws Exception { int threads = cpus - 1; System.out.println("Created a test runner pool of " + threads + " threads"); ExecutorService threadPool = Executors.newFixedThreadPool(threads); - int totalTests = 1_000_000; + int totalMessages = 1_000_000; List runners = new ArrayList(threads); List running = new ArrayList(threads); for (int i = 0; i < threads; i++) { - TestRunner task = new TestRunner(i, totalTests / threads); + TestRunner task = new TestRunner(i, totalMessages / threads); runners.add(task); running.add(threadPool.submit(task)); } @@ -132,15 +180,14 @@ public void monteCarloTest() throws Exception { } } - System.out.println("Tests Completed"); - int countSuccess = 0; - int countAgg = 0; + int countFailed = 0; for (TestRunner t : runners) { countSuccess += t.getRecordsSucess(); - countAgg += t.getRecordsRotated(); + countFailed += t.getRecordsFailed(); } - org.junit.Assert.assertEquals("Correct User Record Count", countSuccess, countAgg); + assertEquals("Correct User Record Count", totalMessages, countSuccess); + assertEquals("No Failed Records", 0, countFailed); } -} +} \ No newline at end of file diff --git a/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/RecordAggregatorTest.java b/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/RecordAggregatorTest.java index ce2386c..38b9cb7 100644 --- a/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/RecordAggregatorTest.java +++ b/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/RecordAggregatorTest.java @@ -25,7 +25,6 @@ public class RecordAggregatorTest { protected final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; - protected final String REVERSE_ALPHABET = "zyxwvutsrqponmlkjihgfedcba"; @Test public void testSingleUserRecord() diff --git a/java/KinesisDeaggregator/pom.xml b/java/KinesisDeaggregator/pom.xml index 787c428..5610f0e 100644 --- a/java/KinesisDeaggregator/pom.xml +++ b/java/KinesisDeaggregator/pom.xml @@ -1,14 +1,15 @@ - 4.0.0 amazon-kinesis-deaggregator A library for performing in-memory deaggregation of Kinesis aggregated stream records. - + com.amazonaws amazon-kinesis-deaggregator 1.0.3 - + jar @@ -20,7 +21,7 @@ scm:git:git://github.com/awslabs/kinesis-aggregation.git https://github.com/awslabs/kinesis-aggregation - + Amazon Software License @@ -29,16 +30,16 @@ - - - amazonwebservices - Amazon Web Services - https://aws.amazon.com - - developer - - - + + + amazonwebservices + Amazon Web Services + https://aws.amazon.com + + developer + + + clean compile @@ -85,24 +86,25 @@ - org.apache.maven.plugins - maven-antrun-plugin - 1.8 - - - copy - - - - - - package + org.apache.maven.plugins + maven-antrun-plugin + 1.8 + + + copy + + + + + + package - run - - - - + run + + + + @@ -111,7 +113,18 @@ com.amazonaws amazon-kinesis-client - 1.8.8 + 1.9.2 + + + com.fasterxml.jackson.core + jackson-databind + + + + + com.fasterxml.jackson.core + jackson-databind + 2.8.11.1 com.amazonaws