Skip to content

Amazon Kinesis Guide

Managed Streaming Service on AWS


Overview

Amazon Kinesis is a fully managed streaming service on AWS. It simplifies streaming infrastructure by handling operations like provisioning, scaling, and replication. This guide covers Kinesis Data Streams, Firehose, and Analytics for data engineering workloads.


Kinesis Services


Kinesis Data Streams

Core Concepts

ConceptDescription
ShardUnit of throughput (1MB/s input, 2MB/s output)
ProducerWrites data to a stream
ConsumerReads data from a stream
RecordSingle data unit
Sequence NumberOrdering within a shard

Kinesis Data Streams

Producer Example

import boto3
import json
kinesis = boto3.client('kinesis')
# Create stream
kinesis.create_stream(
StreamName='events-stream',
ShardCount=4 # 4MB/s input, 8MB/s output
)
# Put records
def send_events(events):
records = [{
'Data': json.dumps(event).encode('utf-8'),
'PartitionKey': event['user_id'] # Routes to specific shard
} for event in events]
# Batch put (up to 500 records)
kinesis.put_records(
StreamName='events-stream',
Records=records
)
# Single record
kinesis.put_record(
StreamName='events-stream',
Data=json.dumps(event).encode('utf-8'),
PartitionKey='user_123'
)

Consumer Example

import boto3
kinesis = boto3.client('kinesis')
# Get shard iterator
response = kinesis.get_shard_iterator(
StreamName='events-stream',
ShardId='shard-000000000001',
ShardIteratorType='LATEST'
)
shard_iterator = response['ShardIterator']
# Get records
while True:
response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100 # Up to 100 records per call
)
records = response['Records']
if not records:
# No more records (millisecond delay)
import time
time.sleep(1)
# Get new iterator
response = kinesis.get_shard_iterator(
StreamName='events-stream',
ShardId='shard-000000000001',
ShardIteratorType='LATEST',
StartingSequenceNumber=response['NextShardIterator']
)
shard_iterator = response['ShardIterator']
continue
# Process records
for record in records:
event = json.loads(record['Data'])
process_event(event)
# Check for more records
if 'NextShardIterator' in response['MillisBehindLatest']:
# Continue reading
shard_iterator = response['NextShardIterator']

Kinesis Client Library (KCL)

Enhanced Fan-Out

Enhanced Fan-Out: Each consumer gets dedicated throughput (2MB/s) without sharing.

from kcl import KCLProcessor, kcl
class EventProcessor(KCLProcessor):
def initialize(self):
self.processor = self
def process_record(self, record)
# Process record
event = json.loads(record.data.decode('utf-8'))
return True
# Configure enhanced fan-out
config = {
'streamName': 'events-stream',
'applicationName': 'event-processor',
'initialPositionInStream': 'LATEST',
'regionName': 'us-east-1',
'maxRecords': 100,
'idleTimeBetweenReadsInMillis': 100
}
# Run processor
from kcl import KCLProcessor
processor = EventProcessor()
kcl_process(config, processor)

Kinesis Firehose

Auto-Delivery

Firehose Configuration

import boto3
firehose = boto3.client('firehose')
# Create delivery stream
firehose.create_delivery_stream(
DeliveryStreamName='events-to-s3',
S3DestinationConfiguration={
'RoleARN': 'arn:aws:iam::account:role/FirehoseRole',
'BucketARN': 'arn:aws:s3:::account:bucket/events',
'Prefix': 'events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/',
'ErrorOutputPrefix': 'errors/'
}
)
# Put record
firehose.put_record(
DeliveryStreamName='events-to-s3',
Record={
'Data': json.dumps(event).encode('utf-8')
}
)

Kinesis Analytics

SQL Queries on Streams

-- Create application
CREATE OR REPLACE APPLICATION stream_app;
-- Create input stream
CREATE OR REPLACE STREAM input_stream (
event_id VARCHAR(16),
user_id BIGINT,
event_type VARCHAR(50),
timestamp TIMESTAMP
);
-- Create output stream
CREATE OR REPLACE STREAM output_stream AS
SELECT
user_id,
event_type,
COUNT(*) as event_count,
WINDOW_TUMBLING(60 SECOND) as window
FROM input_stream
GROUP BY user_id, event_type, WINDOW_TUMBLING(60 SECOND);

Performance Tuning

Shard Counting

Required shards = Peak throughput / Per-shard throughput
Example:
- Peak: 10MB/s
- Per-shard input: 1MB/s
- Per-shard output: 2MB/s
For writes (input): 10MB/s / 1MB/s = 10 shards minimum
For reads (output): 10MB/s / 2MB/s = 5 shards minimum
Use max(10, 5) = 10 shards

Scaling Strategy

Shard splitting: When increasing shards, existing data is redistributed. Can cause temporary performance degradation.


Cost Optimization

Pricing Model

ComponentPricingExample Cost
Data Streams$0.015/GB + $0.012/shard/hour$0.027/GB (shard+ingestion)
Firehose$0.029/GB + $0.001-0.002/PUT~$0.03/GB
Analytics$0.015/GB + $0.02/complex query~$0.035/GB

Cost Example

Scenario: 100TB/month, 10K messages/sec, 1KB per message

ComponentMonthly Cost
Data ingestion100TB × $0.015/GB = $1,500
**Shards (10 × $0.012/hour × 730)$87.60
Total$1,588/month

Monitoring

CloudWatch Metrics

import boto3
cloudwatch = boto3.client('cloudwatch')
# Get metrics
def get_stream_metrics(stream_name):
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/Kinesis',
MetricName=['GetRecords.IteratorAgeMilliseconds',
'GetRecords.Success',
'PutRecords.Success'],
'IncomingBytes',
'OutgoingBytes'],
Dimensions=[{'Name': 'StreamName', 'Value': stream_name}],
StartTime=start_time,
EndTime=end_time,
Period=300 # 5-minute periods
)
return metrics

Alerting Rules

alerts:
- name: "High consumer lag"
metric: GetRecords.IteratorAgeMilliseconds
threshold: 10000 # 10 seconds
action: "Scale consumers"
- name: "Low throughput"
metric: PutRecords.Success
threshold: 0.95 # < 95% success rate
action: "Investigate producer"

Security

Server-Side Encryption (SSE)

# Create stream with encryption
kinesis.create_stream(
StreamName='encrypted-stream',
ShardCount=2,
StreamMode='STREAMING',
EncryptionType='KMS',
KeyId='arn:aws:kms:us-east-1:account:key/12345678-1234-1234-1234-567890'
)

Access Control

# IAM policy for producer
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:account:stream/events-stream",
"Condition": {
"StringEquals": {
"kms:Decrypt": "arn:aws:kms:us-east-1:account:key/12345678-1234-1234-1234-567890"
}
}
}
]
}

Comparison with Alternatives

FeatureKinesisKafka (MSK)Kafka (self-hosted)
ManagementFully managedPartially managedSelf-managed
ScalingAuto-scalingManual rebalancingManual rebalancing
OperationsMinimalMediumHigh
Cost (at scale)HigherMediumLower
Best forAWS-centric workloads, simplicityKafka compatibility, control

Best Practices

DO

# 1. Use batch puts for efficiency
kinesis.put_records(StreamName='stream', Records=[...])
# 2. Enable server-side encryption
kinesis.create_stream(..., EncryptionType='KMS')
# 3. Monitor shard metrics
shard_metrics = get_stream_metrics('events-stream')
# 4. Use KCL for consumers (simpler)
from kcl import KCLProcessor, kcl
# 5. Set appropriate shard count based on throughput
shards = max(throughput / 1MB/s, required_shards)

DON’T

# 1. Don't ignore retention period
# Old data costs money
# 2. Don't mix Kinesis and Kafka for same pipeline
# Choose one for consistency
# 3. Don't forget enhanced fan-out cost
# Each consumer = 2MB/s dedicated throughput
# 4. Don't use small records inefficiently
# Batch small records together
# 5. Don't ignore CloudWatch costs
# Monitoring and logging add up

Key Takeaways

  1. Fully managed: Minimal operations overhead
  2. Auto-scaling: Automatically handles throughput changes
  3. Integrated: Seamless integration with AWS services
  4. Firehose: Auto-delivery to S3, Redshift, etc.
  5. Analytics: SQL queries on streams
  6. Cost: Higher at scale than self-hosted Kafka
  7. AWS lock-in: Tightly coupled to AWS ecosystem
  8. Simpler operations: No broker management required

Back to Module 2