Amazon – Streams, Producers, and Consumers

To create a stream

  1. Open the Amazon Kinesis console at https://console.aws.amazon.com/kinesis.
  2. Choose Go to Streams.
  3. In the navigation bar, expand the region selector and choose a region.
  4. Choose Create Stream.
  5. Type a name for your stream (for example, StockTradeStream).
  6. Type 1 for the number of shards, but leave Estimate the number of shards you’ll need collapsed.
  7. Choose Create.

Create IAM Policy and User

Producer

Actions Resource Purpose
DescribeStream Amazon Kinesis stream Before attempting to write records, the producer should check if the stream exists and is active.
PutRecord, PutRecords Amazon Kinesis stream Write records to Streams.

Consumer

Actions Resource Purpose
DescribeStream Amazon Kinesis stream Before attempting to read records, the consumer checks if the stream exists and is active, and if the shards are contained in the stream.
GetRecords, GetShardIterator Amazon Kinesis stream Read records from a Streams shard.
CreateTable, DescribeTable,GetItem, PutItem,PutMetricData, Scan, UpdateItem Amazon DynamoDB table If the consumer is developed using the Kinesis Client Library (KCL), it needs permissions to a DynamoDB table to track the processing state of the application. The first consumer started creates the table.
DeleteItem Amazon DynamoDB table For when the consumer performs split/merge operations on Streams shards.
PutMetricData Amazon CloudWatch log The KCL also uploads metrics to CloudWatch, which are useful for monitoring the application.

 

To create an IAM policy

  1. Determine the Amazon Resource Name (ARN) for the new stream. The ARN format is as follows:
    arn:aws:kinesis:region:account:stream/name
    region
    The region code; for example, us-west-2. For more information, see Region and Availability Zone Concepts.

    account
    The AWS account ID, as shown in Account Settings.

    name
    The name of the stream from Step 1: Create a Stream, which is StockTradeStream.

    1. Choose Select next to Policy Generator.
    2. Choose Amazon Kinesis as the AWS service.
    3. Select DescribeStream, GetShardIterator, GetRecords, PutRecord, and PutRecords as the allowed actions.
    4. Type the ARN that you created in Step 1.
    5. Use Add Statement for each of the following:
      AWS Service Actions ARN
      Amazon DynamoDB CreateTable, DeleteItem,DescribeTable, GetItem,PutItem, PutMetricData,Scan, UpdateItem The ARN you created in Step 2
      Amazon CloudWatch PutMetricData *

      The asterisk (*) is used when specifying an ARN is not required. In this case, it’s because there is no specific resource in CloudWatch on which the PutMetricData action is invoked.

 

 

 

 

Amazon – Kinesis Streams Producers -Using Kinesis Streams API with the AWS SDK for Java

PutRecords Example

The following code creates 100 data records with sequential partition keys and puts them in a stream called DataStream.

AmazonKinesisClient amazonKinesisClient = new AmazonKinesisClient(credentialsProvider);
PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
putRecordsRequest.setStreamName("DataStream");
List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
for (int i = 0; i < 100; i++) {
    PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
    putRecordsRequestEntryList.add(putRecordsRequestEntry); 
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult  = amazonKinesisClient.putRecords(putRecordsRequest);
System.out.println("Put Result" + putRecordsResult);

Amazon – Kinesis Streams Producers Using – Kinesis Producer Library

Role of the KPL

The KPL is an easy-to-use, highly configurable library that helps you write to a Amazon Kinesis stream. It acts as an intermediary between your producer application code and the Streams API actions. The KPL performs the following primary tasks:

  • Writes to one or more Amazon Kinesis streams with an automatic and configurable retry mechanism
  • Collects records and uses PutRecords to write multiple records to multiple shards per request
  • Aggregates user records to increase payload size and improve throughput
  • Integrates seamlessly with the Amazon Kinesis Client Library (KCL) to de-aggregate batched records on the consumer
  • Submits Amazon CloudWatch metrics on your behalf to provide visibility into producer performance

Note that the KPL is different from the Streams API that is available in the AWS SDKs. The Streams API helps you manage many aspects of Streams (including creating streams, resharding, and putting and getting records), while the KPL provides a layer of abstraction specifically for ingesting data.

amazon kinesis – describe stream

After the createStreamRequest object is configured, create a stream by calling the createStream method on the client.

After calling createStream, wait for the stream to reach the ACTIVE state before performing any operations on the stream. To check the state of the stream, call the describeStream method.

 

client.createStream( createStreamRequest );
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();
long endTime = startTime + ( 10 * 60 * 1000 );
while ( System.currentTimeMillis() < endTime ) {
  try {
    Thread.sleep(20 * 1000);
  } 
  catch ( Exception e ) {}
  
  try {
    DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );
    String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
    if ( streamStatus.equals( "ACTIVE" ) ) {
      break;
    }
    //
    // sleep for one second
    //
    try {
      Thread.sleep( 1000 );
    }
    catch ( Exception e ) {}
  }
  catch ( ResourceNotFoundException e ) {}
}
if ( System.currentTimeMillis() >= endTime ) {
  throw new RuntimeException( "Stream " + myStreamName + " never went active" );
}

Amazon – Kinesis – create stream client and stream

creates the Streams client and sets the endpoint information for the client. 
This overload of setEndpoint also includes the service name and region. 
Set serviceName to kinesis.

client = new AmazonKinesisClient();
client.setEndpoint(endpoint, serviceName, regionId);


Amazon API Gateway

Region Name Region Endpoint Protocol
US East (N. Virginia) us-east-1 apigateway.us-east-1.amazonaws.com HTTPS

 

 

Amazon Kinesis Streams

Region Name Region Endpoint Protocol
US East (N. Virginia) us-east-1 kinesis.us-east-1.amazonaws.com HTTPS

Create the Stream

Now that you have created your Streams client, you can create a stream to work with, which you can accomplish with the Streams console, or programmatically. To create a stream programmatically, instantiate a CreateStreamRequest object and specify a name for the stream and the number of shards for the stream to use.

CreateStreamRequest createStreamRequest = new CreateStreamRequest();
createStreamRequest.setStreamName( myStreamName );
createStreamRequest.setShardCount( myStreamSize );