Amazon Kinesis Guide
Managed Streaming Service on AWS
Overview
Amazon Kinesis is a fully managed streaming service on AWS. It simplifies streaming infrastructure by handling operations like provisioning, scaling, and replication. This guide covers Kinesis Data Streams, Firehose, and Analytics for data engineering workloads.
Kinesis Services
Kinesis Data Streams
Core Concepts
| Concept | Description |
|---|---|
| Shard | Unit of throughput (1MB/s input, 2MB/s output) |
| Producer | Writes data to a stream |
| Consumer | Reads data from a stream |
| Record | Single data unit |
| Sequence Number | Ordering within a shard |
Kinesis Data Streams
Producer Example
import boto3import json
kinesis = boto3.client('kinesis')
# Create streamkinesis.create_stream( StreamName='events-stream', ShardCount=4 # 4MB/s input, 8MB/s output)
# Put recordsdef send_events(events): records = [{ 'Data': json.dumps(event).encode('utf-8'), 'PartitionKey': event['user_id'] # Routes to specific shard } for event in events]
# Batch put (up to 500 records) kinesis.put_records( StreamName='events-stream', Records=records )
# Single recordkinesis.put_record( StreamName='events-stream', Data=json.dumps(event).encode('utf-8'), PartitionKey='user_123')Consumer Example
import boto3
kinesis = boto3.client('kinesis')
# Get shard iteratorresponse = kinesis.get_shard_iterator( StreamName='events-stream', ShardId='shard-000000000001', ShardIteratorType='LATEST')
shard_iterator = response['ShardIterator']
# Get recordswhile True: response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 # Up to 100 records per call )
records = response['Records']
if not records: # No more records (millisecond delay) import time time.sleep(1)
# Get new iterator response = kinesis.get_shard_iterator( StreamName='events-stream', ShardId='shard-000000000001', ShardIteratorType='LATEST', StartingSequenceNumber=response['NextShardIterator'] ) shard_iterator = response['ShardIterator'] continue
# Process records for record in records: event = json.loads(record['Data']) process_event(event)
# Check for more records if 'NextShardIterator' in response['MillisBehindLatest']: # Continue reading shard_iterator = response['NextShardIterator']Kinesis Client Library (KCL)
Enhanced Fan-Out
Enhanced Fan-Out: Each consumer gets dedicated throughput (2MB/s) without sharing.
from kcl import KCLProcessor, kcl
class EventProcessor(KCLProcessor): def initialize(self): self.processor = self
def process_record(self, record) # Process record event = json.loads(record.data.decode('utf-8')) return True
# Configure enhanced fan-outconfig = { 'streamName': 'events-stream', 'applicationName': 'event-processor', 'initialPositionInStream': 'LATEST', 'regionName': 'us-east-1', 'maxRecords': 100, 'idleTimeBetweenReadsInMillis': 100}
# Run processorfrom kcl import KCLProcessor
processor = EventProcessor()kcl_process(config, processor)Kinesis Firehose
Auto-Delivery
Firehose Configuration
import boto3
firehose = boto3.client('firehose')
# Create delivery streamfirehose.create_delivery_stream( DeliveryStreamName='events-to-s3', S3DestinationConfiguration={ 'RoleARN': 'arn:aws:iam::account:role/FirehoseRole', 'BucketARN': 'arn:aws:s3:::account:bucket/events', 'Prefix': 'events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/', 'ErrorOutputPrefix': 'errors/' })
# Put recordfirehose.put_record( DeliveryStreamName='events-to-s3', Record={ 'Data': json.dumps(event).encode('utf-8') })Kinesis Analytics
SQL Queries on Streams
-- Create applicationCREATE OR REPLACE APPLICATION stream_app;
-- Create input streamCREATE OR REPLACE STREAM input_stream ( event_id VARCHAR(16), user_id BIGINT, event_type VARCHAR(50), timestamp TIMESTAMP);
-- Create output streamCREATE OR REPLACE STREAM output_stream ASSELECT user_id, event_type, COUNT(*) as event_count, WINDOW_TUMBLING(60 SECOND) as windowFROM input_streamGROUP BY user_id, event_type, WINDOW_TUMBLING(60 SECOND);Performance Tuning
Shard Counting
Required shards = Peak throughput / Per-shard throughput
Example:- Peak: 10MB/s- Per-shard input: 1MB/s- Per-shard output: 2MB/s
For writes (input): 10MB/s / 1MB/s = 10 shards minimumFor reads (output): 10MB/s / 2MB/s = 5 shards minimum
Use max(10, 5) = 10 shardsScaling Strategy
Shard splitting: When increasing shards, existing data is redistributed. Can cause temporary performance degradation.
Cost Optimization
Pricing Model
| Component | Pricing | Example Cost |
|---|---|---|
| Data Streams | $0.015/GB + $0.012/shard/hour | $0.027/GB (shard+ingestion) |
| Firehose | $0.029/GB + $0.001-0.002/PUT | ~$0.03/GB |
| Analytics | $0.015/GB + $0.02/complex query | ~$0.035/GB |
Cost Example
Scenario: 100TB/month, 10K messages/sec, 1KB per message
| Component | Monthly Cost |
|---|---|
| Data ingestion | 100TB × $0.015/GB = $1,500 |
| **Shards (10 × $0.012/hour × 730) | $87.60 |
| Total | $1,588/month |
Monitoring
CloudWatch Metrics
import boto3
cloudwatch = boto3.client('cloudwatch')
# Get metricsdef get_stream_metrics(stream_name): metrics = cloudwatch.get_metric_statistics( Namespace='AWS/Kinesis', MetricName=['GetRecords.IteratorAgeMilliseconds', 'GetRecords.Success', 'PutRecords.Success'], 'IncomingBytes', 'OutgoingBytes'], Dimensions=[{'Name': 'StreamName', 'Value': stream_name}], StartTime=start_time, EndTime=end_time, Period=300 # 5-minute periods ) return metricsAlerting Rules
alerts: - name: "High consumer lag" metric: GetRecords.IteratorAgeMilliseconds threshold: 10000 # 10 seconds action: "Scale consumers"
- name: "Low throughput" metric: PutRecords.Success threshold: 0.95 # < 95% success rate action: "Investigate producer"Security
Server-Side Encryption (SSE)
# Create stream with encryptionkinesis.create_stream( StreamName='encrypted-stream', ShardCount=2, StreamMode='STREAMING', EncryptionType='KMS', KeyId='arn:aws:kms:us-east-1:account:key/12345678-1234-1234-1234-567890')Access Control
# IAM policy for producer{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:us-east-1:account:stream/events-stream", "Condition": { "StringEquals": { "kms:Decrypt": "arn:aws:kms:us-east-1:account:key/12345678-1234-1234-1234-567890" } } } ]}Comparison with Alternatives
| Feature | Kinesis | Kafka (MSK) | Kafka (self-hosted) |
|---|---|---|---|
| Management | Fully managed | Partially managed | Self-managed |
| Scaling | Auto-scaling | Manual rebalancing | Manual rebalancing |
| Operations | Minimal | Medium | High |
| Cost (at scale) | Higher | Medium | Lower |
| Best for | AWS-centric workloads, simplicity | Kafka compatibility, control |
Best Practices
DO
# 1. Use batch puts for efficiencykinesis.put_records(StreamName='stream', Records=[...])
# 2. Enable server-side encryptionkinesis.create_stream(..., EncryptionType='KMS')
# 3. Monitor shard metricsshard_metrics = get_stream_metrics('events-stream')
# 4. Use KCL for consumers (simpler)from kcl import KCLProcessor, kcl
# 5. Set appropriate shard count based on throughputshards = max(throughput / 1MB/s, required_shards)DON’T
# 1. Don't ignore retention period# Old data costs money
# 2. Don't mix Kinesis and Kafka for same pipeline# Choose one for consistency
# 3. Don't forget enhanced fan-out cost# Each consumer = 2MB/s dedicated throughput
# 4. Don't use small records inefficiently# Batch small records together
# 5. Don't ignore CloudWatch costs# Monitoring and logging add upKey Takeaways
- Fully managed: Minimal operations overhead
- Auto-scaling: Automatically handles throughput changes
- Integrated: Seamless integration with AWS services
- Firehose: Auto-delivery to S3, Redshift, etc.
- Analytics: SQL queries on streams
- Cost: Higher at scale than self-hosted Kafka
- AWS lock-in: Tightly coupled to AWS ecosystem
- Simpler operations: No broker management required
Back to Module 2