🏭 Amazon Kinesis Data Streams

ベルトコンベアシステムで理解する超入門ガイド

🏭 工場のベルトコンベア → 📦 商品の流れ → 📊 リアルタイム処理

Kinesis Data Streamsを、身近な工場のベルトコンベアシステムで例えて分かりやすく説明します!

🤔 まず、Kinesis Data Streamsって何?

📝 簡単に言うと...

大量のデータ リアルタイム で、 複数のベルトコンベア を使って 効率的に処理 するAWSのサービスです

🏭 工場のベルトコンベア

作業員 商品
ベルトコンベアに載せる


👆 複数のラインで
🔄 同時に大量処理


📦 商品が順番に流れて
🏭 各工程で処理 される

☁️ Kinesis Data Streams

アプリケーション データ
ストリームに送信


👆 複数のシャードで
🔄 同時に大量処理


📊 データが順番に流れて
⚙️ 各システムで処理 される

🏭 重要な5つの要素をベルトコンベアで理解

📊

Data Stream
= 工場全体のコンベアシステム

データの流れ全体 を管理します。
工場の複数のベルトコンベアを統括する システム全体 のようなものです。

具体例: ログ解析システム
名前: my-log-stream
🔄

Shard
= 個々のベルトコンベアライン

実際のデータ処理ライン です。
工場の 1本1本のベルトコンベア で、それぞれが独立してデータを処理します。

処理能力: 1MB/秒 または 1,000レコード/秒
特徴: 自動で負荷分散
👷

Producer
= 商品を載せる作業員

データを送信する側 です。
作業員が商品をベルトコンベアに載せるように、 アプリケーションがデータを送信 します。

例: Webサーバー、IoTデバイス
役割: データの投入
📦

Consumer
= 商品を受け取る作業員

データを受信・処理する側 です。
ベルトコンベアから商品を受け取る作業員のように、 データを受信して処理 します。

例: 分析システム、データベース
役割: データの取得・処理
🏷️

Partition Key
= 商品の仕分けタグ

データの分散ルール です。
商品に貼る 「食品」「電子機器」 などのタグで、どのラインに送るかを決めます。

例: ユーザーID、地域コード
効果: 同じキーのデータは同じシャードへ

📦 データレコード = 荷札付きの商品

🏷️ 3つの情報が付いた商品

データレコードは、ベルトコンベアに流れる「荷札付きの商品」のようなものです

🎁

Data
= 商品の中身

実際に送りたいデータ です。
商品の中身のように、JSON、CSV、画像など 任意の形式 で送れます。

最大サイズ: 1MB
形式: JSON、CSV、バイナリなど自由
🏷️

Partition Key
= 配送先ラベル

どのライン(Shard)に送るか を決めます。
商品に貼る配送先ラベルのように、 仕分けの基準 になります。

例: "user123", "device_001"
役割: 同じキーは同じShardへ
🔢

Sequence Number
= 商品の通し番号

Kinesisが自動で付ける 一意の番号です。
工場の通し番号のように、 そのライン内での順序 を保証します。

自動生成: ユーザーは設定不要
用途: 順序保証、重複防止

📋 データレコードの構造

{ "Data": "{\"user_id\":\"123\",\"action\":\"click\"}", "PartitionKey": "user123", "SequenceNumber": "49590338271490256608559692538361571095921575989136588610" }

この3つの要素がセットで、1つのデータレコードを構成します

🔢 シーケンス番号の仕組み

📊 ベルトコンベアでの通し番号システム

各ライン(Shard)ごとに、商品に通し番号を付けて順序を管理します

🏭 各ラインでの通し番号管理

🔄 Shard 1(ユーザーAliceのライン)

📝
Login
Seq: 001
👀
View Page
Seq: 002
🛒
Add Cart
Seq: 003

🔄 Shard 2(ユーザーBobのライン)

📝
Login
Seq: 001
🔍
Search
Seq: 002
💳
Purchase
Seq: 003

⚡ シーケンス番号の重要な特徴

✅ Shard内での順序保証: 同じShard内では必ず番号順に処理

❌ Shard間では順序保証なし: 異なるShardの番号に関連性はなし

🔢 非常に大きな数値: 文字列形式の長い番号

🤖 自動生成: Kinesisが自動で採番、ユーザーは設定不要

📈 実際のデータ送受信の流れ

1

Producer側:データ送信

// ユーザーAliceのアクション await kinesis.putRecord({ StreamName: 'user-actions', Data: JSON.stringify({user: 'alice', action: 'login'}), PartitionKey: 'alice' }); // → Shard 1, Sequence: 49590...001 (自動生成) await kinesis.putRecord({ StreamName: 'user-actions', Data: JSON.stringify({user: 'alice', action: 'view_page'}), PartitionKey: 'alice' }); // → Shard 1, Sequence: 49590...002 (自動生成)
2

Consumer側:データ受信

// Shard 1から順番に受信(必ず送信順) { "SequenceNumber": "49590338271490256608559692538361571095921575989136588610", "Data": "{\"user\":\"alice\",\"action\":\"login\"}", "PartitionKey": "alice" } { "SequenceNumber": "49590338271490256608559692538361571095921575989136588611", "Data": "{\"user\":\"alice\",\"action\":\"view_page\"}", "PartitionKey": "alice" } // ↑ 必ず login → view_page の順序で受信される!

🎯 シーケンス番号の活用方法

🚫

重複処理の防止

処理済みシーケンス番号を記録して、同じデータの重複処理を防止

📍

処理再開ポイント

最後に処理した番号を保存して、障害復旧時に正確な位置から再開

📊

データ順序の確認

番号の連続性をチェックして、データ欠損や順序異常を検出

🔗

一意識別子として

Shard ID + シーケンス番号で、全データレコードを一意に識別

💡 実践的なコード例

🚫 重複処理を防ぐコード
📍 処理再開ポイントの保存
📊 データ順序の監視

⚠️ よくある誤解と注意点

❌ よくある間違い

🔢 整数だと思う
→ 実際は非常に長い文字列


🌍 全Shard通じてユニーク
→ 実際はShard内でのみユニーク


✏️ 自分で設定できる
→ 実際はKinesisが自動生成


📈 連続する数値
→ 実際は大きく跳ぶことがある

✅ 正しい理解

📝 文字列として扱う
→ `record.SequenceNumber` (string)


🔗 ShardID + 番号で一意
→ `${shardId}-${sequenceNumber}`


🤖 完全に自動
→ ユーザーは一切設定不要


📊 文字列比較で順序判定
→ `seqA.localeCompare(seqB)`

🎯 実装時のベストプラクティス

📝 常に文字列として扱う: 数値変換は行わない

💾 チェックポイントを定期保存: 障害時の復旧を考慮

🔍 順序監視を実装: データ欠損の早期発見

⚡ 重複処理を考慮: 冪等性のある処理を心がける

📊 メトリクス収集: 処理遅延やエラー率を監視

🔄 実際のベルトコンベアの動きを見てみよう

📊 Kinesis Data Stream の動作イメージ

👷‍♂️

Producer
(データ送信者)

例:

  • Webアプリケーション
  • IoTセンサー
  • ログ収集システム

Data Stream: my-log-stream

Shard 1
📊
1MB/秒
Shard 2
📈
1MB/秒
Shard 3
📉
1MB/秒

合計: 3MB/秒の処理能力

🤖

Consumer
(データ受信者)

例:

  • 分析システム
  • データウェアハウス
  • 機械学習パイプライン

📈 データの流れの特徴

🔄 リアルタイム: データがベルトコンベアのように連続して流れます

⚖️ 負荷分散: Partition Keyに基づいて複数のShardに自動分散

📊 順序保証: 同じシャード内ではデータの順序が保証されます

🔒 耐久性: 24時間〜365日間データを保持(設定可能)

🏷️ Partition Key(仕分けタグ)の仕組み

📦 商品の仕分けシステム

Partition Keyは、商品(データ)をどのベルトコンベア(Shard)に送るかを決める重要な仕組みです

🏷️ 仕分け作業の流れ

1

商品(データ)にタグを付ける

Producer側で設定:
商品に「ユーザーID: user123」というタグを付けます
工場の例: 「食品」「電子機器」「衣料品」のラベルを貼る

2

タグを見てライン(Shard)を決定

ハッシュ関数で計算:
「user123」→ ハッシュ計算 → Shard 2に決定
工場の例: 「食品」タグ → ライン2、「電子機器」タグ → ライン1

3

同じタグは必ず同じラインへ

順序保証の仕組み:
「user123」のデータは必ずShard 2に送られます
工場の例: 「食品」は必ずライン2で、順番通りに処理される

⚖️

負荷分散

適切なPartition Keyで、データを複数のShardに均等に分散

📊

順序保証

同じKeyのデータは同じShardで順番通りに処理される

🎯

効率的処理

関連データをまとめて処理することで効率アップ

🔧

設計の重要性

適切なKey設計で、システム全体のパフォーマンスが決まる

💡 良いPartition Keyの選び方

✅ 良い例:

  • ユーザーID: ユーザーごとのアクションを順序保証
  • デバイスID: IoTデータの順序を保持
  • 注文ID: 注文処理の流れを順番通りに

❌ 避けるべき例:

  • 固定値: 全データが1つのShardに集中
  • タイムスタンプ: 時間によって偏りが発生
  • ランダム値: 関連データがバラバラに

📋 実際の設定手順(超簡単版)

1

工場(Data Stream)を建設

AWSコンソールでData Stream作成
→ ストリーム名を決める(例: my-log-stream)
→ 必要なShard数を決める(処理量に応じて)

aws kinesis create-stream \ --stream-name my-log-stream \ --shard-count 3
2

作業員(Producer)を配置

データ送信アプリケーションを開発
→ AWS SDKを使ってデータ送信
→ Partition Keyを適切に設定

// JavaScript例 const params = { StreamName: 'my-log-stream', Data: JSON.stringify(logData), PartitionKey: 'user123' }; kinesis.putRecord(params);
3

受取人(Consumer)を配置

データ処理アプリケーションを開発
→ Kinesis Client Libraryを使用
→ 受信したデータを適切に処理

// Consumer設定例 const consumer = new KinesisConsumer({ streamName: 'my-log-stream', region: 'ap-northeast-1' }); consumer.on('data', (record) => { // データ処理ロジック console.log(record); });
4

監視システムを設置

CloudWatchでモニタリング
→ Shard使用率、エラー率を監視
→ 必要に応じてShard数を調整

5

運用開始!

リアルタイムデータ処理スタート
→ ベルトコンベアが動き始める!
→ データが連続して流れて処理される

🚚 シャードのデータ量制限

⚖️ ベルトコンベアの処理能力

各シャード(ベルトコンベアライン)には明確な処理能力の上限があります

🏭 工場のベルトコンベア

搬入能力
1秒間に1トンの荷物
または1,000個の商品


搬出能力
1秒間に2トンの荷物を
作業員が取り出し可能


1個の荷物
最大1トンまで

☁️ Kinesis Shard

書き込み制限
1秒間に1MBのデータ
または1,000レコード


読み込み制限
1秒間に2MBのデータを
Consumerが取得可能


1レコード
最大1MBまで

📦

書き込み制限
(Producer側)

1MB/秒 または 1,000レコード/秒
のいずれか小さい方

📥

読み込み制限
(Consumer側)

2MB/秒
Enhanced Fan-Outなら
5Consumer × 2MB/秒

📋

レコードサイズ
(個別制限)

最大1MB
1つのレコードの
上限サイズ

バッチ処理
(PutRecords API)

500レコード または
5MB /リクエスト
のいずれか小さい方

📊 API別の詳細制限

API

PutRecord API(1個ずつ送信)

制限: 1,000回/秒, 1MB/秒
用途: リアルタイム送信、少量データ

// 1つずつ送信 await kinesis.putRecord({ StreamName: 'my-stream', Data: JSON.stringify(userData), // 最大1MB PartitionKey: 'user123' }); // 制限: このAPIを1秒間に1,000回まで呼び出し可能
API

PutRecords API(まとめて送信)

制限: 500レコード/リクエスト, 5MB/リクエスト
用途: バッチ処理、高効率送信

// まとめて送信(推奨) await kinesis.putRecords({ StreamName: 'my-stream', Records: [ { Data: data1, PartitionKey: key1 }, { Data: data2, PartitionKey: key2 }, // ... 最大500個、合計5MBまで ] }); // より効率的でスループットが高い
API

GetRecords API(データ取得)

制限: 5回/秒, 2MB/秒, 10,000レコード/リクエスト
用途: Consumer側でのデータ取得

// データ取得 const response = await kinesis.getRecords({ ShardIterator: iterator, Limit: 100 // 最大10,000レコード/リクエスト }); // 制限: 1つのShardに対して5回/秒まで // 実際の取得量: 2MB/秒まで

⚠️ 制限超過時の動作(スロットリング)

🚫 ProvisionedThroughputExceededException が発生:

  • 1MB/秒 または 1,000レコード/秒を超過
  • API呼び出しが拒否される
  • 自動的にリトライが必要

📈 工場の例: ベルトコンベアの処理能力を超えて荷物を載せようとすると、「処理能力オーバー」で拒否される

🎯 制限対策とベストプラクティス

🔄

指数バックオフリトライ

スロットリング発生時の対応
工場が混雑時、少し待ってから再度搬入するように、徐々に間隔を空けてリトライ

const retryWithBackoff = async (operation, maxRetries = 5) => { for (let i = 0; i < maxRetries; i++) { try { return await operation(); } catch (error) { if (error.code === 'ProvisionedThroughputExceededException') { const delay = Math.pow(2, i) * 100; // 100ms, 200ms, 400ms... await new Promise(resolve => setTimeout(resolve, delay)); } else { throw error; } } } };
📦

効率的なバッチング

PutRecords APIの活用
1つずつ運ぶより、まとめて運ぶ方が効率的。最大500レコードをバッチ処理

const batchPutRecords = async (records) => { const BATCH_SIZE = 500; const batches = []; for (let i = 0; i < records.length; i += BATCH_SIZE) { batches.push(records.slice(i, i + BATCH_SIZE)); } for (const batch of batches) { await retryWithBackoff(() => kinesis.putRecords({ StreamName: 'my-stream', Records: batch }) ); } };
⚖️

適切なShard数の計算

必要な処理能力から逆算
工場の処理量に応じてライン数を増やすように、データ量に応じてShard数を決定

// 必要なShard数の計算 const calculateRequiredShards = (dataRate) => { const SHARD_LIMIT_MB_PER_SEC = 1; const SHARD_LIMIT_RECORDS_PER_SEC = 1000; const shardsByDataSize = Math.ceil(dataRate.mbPerSec / SHARD_LIMIT_MB_PER_SEC); const shardsByRecordCount = Math.ceil(dataRate.recordsPerSec / SHARD_LIMIT_RECORDS_PER_SEC); // より制約の厳しい方を採用 return Math.max(shardsByDataSize, shardsByRecordCount); }; // 例: 3MB/秒, 2,500レコード/秒の場合 const required = calculateRequiredShards({ mbPerSec: 3, recordsPerSec: 2500 }); console.log(`必要なShard数: ${required}`); // 3個
📊

レート制御の実装

送信レートの調整
工場の処理能力に合わせて搬入ペースを調整するように、データ送信レートを制御

class RateLimitedProducer { constructor(ratePerSecond = 800) { // 1000の80%で安全マージン this.ratePerSecond = ratePerSecond; this.lastSent = 0; this.tokens = ratePerSecond; } async sendRecord(record) { await this.waitForToken(); return kinesis.putRecord(record); } async waitForToken() { const now = Date.now(); const elapsed = (now - this.lastSent) / 1000; this.tokens = Math.min(this.ratePerSecond, this.tokens + elapsed * this.ratePerSecond); if (this.tokens < 1) { const waitTime = (1 - this.tokens) / this.ratePerSecond * 1000; await new Promise(resolve => setTimeout(resolve, waitTime)); this.tokens = 1; } this.tokens -= 1; this.lastSent = now; } }

💡 実際の計算例

📊 ログデータの送信:1秒間に2,000件のログ(各1KB)
📱 IoTセンサーデータ:1秒間に500件のセンサーデータ(各2KB)
📹 動画メタデータ:1秒間に100件の大きなデータ(各50KB)

🎯 監視すべきメトリクス

📊 CloudWatchで監視:

  • IncomingRecords: 1秒間の受信レコード数
  • IncomingBytes: 1秒間の受信データ量
  • WriteProvisionedThroughputExceeded: スロットリング発生回数
  • ReadProvisionedThroughputExceeded: 読み込みスロットリング

⚠️ アラート設定の目安:

  • 使用率が80%を超えたら警告
  • スロットリングが発生したら即座に通知
  • 処理遅延が閾値を超えたら調査

❓ よくある質問

🤔 Shardはいくつ必要?
💰 料金はどのくらい?
⚡ どのくらいのレイテンシ?
🔧 他のAWSサービスとの連携は?

✨ Kinesis Data Streamsのメリット

リアルタイム処理

数百ミリ秒の超低レイテンシでデータを処理できます

📈

自動スケーリング

データ量に応じてShard数を調整、自動で処理能力を拡張

🔒

高い耐久性

複数のAZでデータを複製、データ損失のリスクを最小化

🔧

豊富な連携

AWS内の様々なサービスとシームレスに連携可能

📊

順序保証

Partition Key単位でデータの順序を保証、整合性を維持

💰

コスト効率

使った分だけの従量課金、無駄なコストを削減

🎯 実際の使用例

📱

リアルタイム分析

ユースケース: アプリのユーザー行動分析
流れ: アプリ → Kinesis → Lambda → DynamoDB
効果: ユーザーの行動をリアルタイムで把握

🌡️

IoTデータ処理

ユースケース: センサーデータの監視
流れ: IoTデバイス → Kinesis → Lambda → CloudWatch
効果: 異常値を即座に検知・アラート

📊

ログ集約・分析

ユースケース: Webサーバーのログ解析
流れ: サーバー → Kinesis → Kinesis Analytics → S3
効果: アクセスパターンの即座な把握

💳

不正検知システム

ユースケース: クレジットカード取引監視
流れ: 取引データ → Kinesis → 機械学習 → アラート
効果: 怪しい取引を瞬時に検出

🎯 まとめ

🏭 Data Stream = 工場全体(複数のベルトコンベアを統括するシステム)

🔄 Shard = 個々のベルトコンベアライン(実際の処理ライン)

👷 Producer = 商品を載せる作業員(データ送信側)

📦 Consumer = 商品を受け取る作業員(データ受信・処理側)

🏷️ Partition Key = 商品の仕分けタグ(どのラインに送るかの決定)


この仕組みで、 大量のデータをリアルタイムで効率的に処理 できます!


🎯 初心者へのアドバイス:

  • 小さく始める → 1〜2Shardから開始
  • Partition Key設計が重要 → ユーザーIDやデバイスIDを活用
  • 監視を忘れずに → CloudWatchでメトリクス確認
  • 他サービスとの連携 → Lambda、S3などと組み合わせて真価発揮

Created by SSuzuki1063

AWS SAP Learning Resources