ベルトコンベアシステムで理解する超入門ガイド
Kinesis Data Streamsを、身近な工場のベルトコンベアシステムで例えて分かりやすく説明します!
大量のデータ を リアルタイム で、 複数のベルトコンベア を使って 効率的に処理 するAWSのサービスです
作業員
が
商品
を
ベルトコンベアに載せる
👆 複数のラインで
🔄
同時に大量処理
!
📦 商品が順番に流れて
🏭
各工程で処理
される
アプリケーション
が
データ
を
ストリームに送信
👆 複数のシャードで
🔄
同時に大量処理
!
📊 データが順番に流れて
⚙️
各システムで処理
される
データの流れ全体
を管理します。
工場の複数のベルトコンベアを統括する
システム全体
のようなものです。
実際のデータ処理ライン
です。
工場の
1本1本のベルトコンベア
で、それぞれが独立してデータを処理します。
データを送信する側
です。
作業員が商品をベルトコンベアに載せるように、
アプリケーションがデータを送信
します。
データを受信・処理する側
です。
ベルトコンベアから商品を受け取る作業員のように、
データを受信して処理
します。
データの分散ルール
です。
商品に貼る
「食品」「電子機器」
などのタグで、どのラインに送るかを決めます。
データレコードは、ベルトコンベアに流れる「荷札付きの商品」のようなものです
実際に送りたいデータ
です。
商品の中身のように、JSON、CSV、画像など
任意の形式
で送れます。
どのライン(Shard)に送るか
を決めます。
商品に貼る配送先ラベルのように、
仕分けの基準
になります。
Kinesisが自動で付ける
一意の番号です。
工場の通し番号のように、
そのライン内での順序
を保証します。
この3つの要素がセットで、1つのデータレコードを構成します
各ライン(Shard)ごとに、商品に通し番号を付けて順序を管理します
✅ Shard内での順序保証: 同じShard内では必ず番号順に処理
❌ Shard間では順序保証なし: 異なるShardの番号に関連性はなし
🔢 非常に大きな数値: 文字列形式の長い番号
🤖 自動生成: Kinesisが自動で採番、ユーザーは設定不要
処理済みシーケンス番号を記録して、同じデータの重複処理を防止
最後に処理した番号を保存して、障害復旧時に正確な位置から再開
番号の連続性をチェックして、データ欠損や順序異常を検出
Shard ID + シーケンス番号で、全データレコードを一意に識別
🔢 整数だと思う
→ 実際は非常に長い文字列
🌍 全Shard通じてユニーク
→ 実際はShard内でのみユニーク
✏️ 自分で設定できる
→ 実際はKinesisが自動生成
📈 連続する数値
→ 実際は大きく跳ぶことがある
📝 文字列として扱う
→ `record.SequenceNumber` (string)
🔗 ShardID + 番号で一意
→ `${shardId}-${sequenceNumber}`
🤖 完全に自動
→ ユーザーは一切設定不要
📊 文字列比較で順序判定
→ `seqA.localeCompare(seqB)`
📝 常に文字列として扱う: 数値変換は行わない
💾 チェックポイントを定期保存: 障害時の復旧を考慮
🔍 順序監視を実装: データ欠損の早期発見
⚡ 重複処理を考慮: 冪等性のある処理を心がける
📊 メトリクス収集: 処理遅延やエラー率を監視
例:
合計: 3MB/秒の処理能力
例:
🔄 リアルタイム: データがベルトコンベアのように連続して流れます
⚖️ 負荷分散: Partition Keyに基づいて複数のShardに自動分散
📊 順序保証: 同じシャード内ではデータの順序が保証されます
🔒 耐久性: 24時間〜365日間データを保持(設定可能)
Partition Keyは、商品(データ)をどのベルトコンベア(Shard)に送るかを決める重要な仕組みです
Producer側で設定:
商品に「ユーザーID: user123」というタグを付けます
工場の例:
「食品」「電子機器」「衣料品」のラベルを貼る
ハッシュ関数で計算:
「user123」→ ハッシュ計算 → Shard 2に決定
工場の例:
「食品」タグ → ライン2、「電子機器」タグ → ライン1
順序保証の仕組み:
「user123」のデータは必ずShard 2に送られます
工場の例:
「食品」は必ずライン2で、順番通りに処理される
適切なPartition Keyで、データを複数のShardに均等に分散
同じKeyのデータは同じShardで順番通りに処理される
関連データをまとめて処理することで効率アップ
適切なKey設計で、システム全体のパフォーマンスが決まる
✅ 良い例:
❌ 避けるべき例:
AWSコンソールでData Stream作成
→ ストリーム名を決める(例: my-log-stream)
→ 必要なShard数を決める(処理量に応じて)
データ送信アプリケーションを開発
→ AWS SDKを使ってデータ送信
→ Partition Keyを適切に設定
データ処理アプリケーションを開発
→ Kinesis Client Libraryを使用
→ 受信したデータを適切に処理
CloudWatchでモニタリング
→ Shard使用率、エラー率を監視
→ 必要に応じてShard数を調整
リアルタイムデータ処理スタート
→ ベルトコンベアが動き始める!
→ データが連続して流れて処理される
各シャード(ベルトコンベアライン)には明確な処理能力の上限があります
搬入能力
1秒間に1トンの荷物
または1,000個の商品
搬出能力
1秒間に2トンの荷物を
作業員が取り出し可能
1個の荷物
最大1トンまで
書き込み制限
1秒間に1MBのデータ
または1,000レコード
読み込み制限
1秒間に2MBのデータを
Consumerが取得可能
1レコード
最大1MBまで
1MB/秒
または
1,000レコード/秒
のいずれか小さい方
2MB/秒
Enhanced Fan-Outなら
5Consumer × 2MB/秒
最大1MB
1つのレコードの
上限サイズ
500レコード
または
5MB
/リクエスト
のいずれか小さい方
制限:
1,000回/秒, 1MB/秒
用途:
リアルタイム送信、少量データ
制限:
500レコード/リクエスト, 5MB/リクエスト
用途:
バッチ処理、高効率送信
制限:
5回/秒, 2MB/秒, 10,000レコード/リクエスト
用途:
Consumer側でのデータ取得
🚫 ProvisionedThroughputExceededException が発生:
📈 工場の例: ベルトコンベアの処理能力を超えて荷物を載せようとすると、「処理能力オーバー」で拒否される
スロットリング発生時の対応
工場が混雑時、少し待ってから再度搬入するように、徐々に間隔を空けてリトライ
PutRecords APIの活用
1つずつ運ぶより、まとめて運ぶ方が効率的。最大500レコードをバッチ処理
必要な処理能力から逆算
工場の処理量に応じてライン数を増やすように、データ量に応じてShard数を決定
送信レートの調整
工場の処理能力に合わせて搬入ペースを調整するように、データ送信レートを制御
📊 CloudWatchで監視:
⚠️ アラート設定の目安:
数百ミリ秒の超低レイテンシでデータを処理できます
データ量に応じてShard数を調整、自動で処理能力を拡張
複数のAZでデータを複製、データ損失のリスクを最小化
AWS内の様々なサービスとシームレスに連携可能
Partition Key単位でデータの順序を保証、整合性を維持
使った分だけの従量課金、無駄なコストを削減
ユースケース:
アプリのユーザー行動分析
流れ:
アプリ → Kinesis → Lambda → DynamoDB
効果:
ユーザーの行動をリアルタイムで把握
ユースケース:
センサーデータの監視
流れ:
IoTデバイス → Kinesis → Lambda → CloudWatch
効果:
異常値を即座に検知・アラート
ユースケース:
Webサーバーのログ解析
流れ:
サーバー → Kinesis → Kinesis Analytics → S3
効果:
アクセスパターンの即座な把握
ユースケース:
クレジットカード取引監視
流れ:
取引データ → Kinesis → 機械学習 → アラート
効果:
怪しい取引を瞬時に検出
🏭 Data Stream = 工場全体(複数のベルトコンベアを統括するシステム)
🔄 Shard = 個々のベルトコンベアライン(実際の処理ライン)
👷 Producer = 商品を載せる作業員(データ送信側)
📦 Consumer = 商品を受け取る作業員(データ受信・処理側)
🏷️ Partition Key = 商品の仕分けタグ(どのラインに送るかの決定)
この仕組みで、 大量のデータをリアルタイムで効率的に処理 できます!
🎯 初心者へのアドバイス:
Created by SSuzuki1063
AWS SAP Learning Resources