
Ce guide en 60–120 minutes vous fait découvrir la mise en place d’un pipeline de télémétrie et d’A/B testing :
- Instrumentation client Unity/Unreal avec file persistante typée, HMAC, back-off exponentiel et flush à la pause/quittance.
- Ingestion AWS (HTTP collector → Kinesis Data Streams → Kinesis Firehose → S3 Parquet) avec chiffrement KMS, DLQ, Glue Schema Registry et conversion Parquet.
- Traitement streaming (Apache Flink/Kinesis Data Analytics) en exactly-once : checkpointing, état backend, watermark.
- Requêtes SQL pour rétention, funnel, monétisation sur BigQuery/Snowflake/Athena+Glue.
- Feature flags & A/B testing contrôlé : randomisation, sequential testing (alpha spending), corrections multiples.
- Observabilité (OpenTelemetry), sécurité (DLP, PII masking) et cycle RGPD (consentement, purge, anonymisation).
Prérequis
- Unity 2021+ ou Unreal 5+
- Compte AWS avec droits Terraform pour IAM, KMS, Kinesis, Firehose, S3, SQS, Lambda, Glue
- Entrepôt SQL : BigQuery, Snowflake ou S3+Athena/Glue
- Solution de feature flags (LaunchDarkly, Unleash, OpenFeature)
- Outil d’expérimentation (alpha spending, sequential, bayésien)
1) Schéma d’événements versionné
Définissez un JSON Schema versionné (Avro/Parquet), référez-le dans Glue Schema Registry ou un catalogue (DataHub/Amundsen) :
{
"$schema": "https://exemple.com/schemas/game_event/1-0-0.json",
"type": "object",
"required": ["event_id","event_name","occurred_at","user_id","session_id"],
"properties": {
"event_id": {"type":"string","format":"uuid"},
"event_name": {"type":"string"},
"occurred_at": {"type":"string","format":"date-time"},
"received_at": {"type":"string","format":"date-time"},
"user_id": {"type":"string"},
"session_id": {"type":"string"},
"build_id": {"type":"string"},
"platform": {"type":"string","enum":["ios","android","pc","console"]},
"mechanic_version": {"type":"string"},
"attributes": {"type":"object","additionalProperties":true}
}
}
Veillez à limiter la cardinalité de attributes et à gérer les évolutions par compatibilité.
2) Instrumentation Unity (C#)
Exemple de TelemetryManager robuste : file typée, sérialisation/désérialisation sûre, coroutine Unity pour HTTP.

using System;
using System.IO;
using System.Collections.Generic;
using UnityEngine;
using UnityEngine.Networking;
using Newtonsoft.Json;
using System.Security.Cryptography;
using System.Text;
public class TelemetryManager : MonoBehaviour
{
const string ENDPOINT = "https://collector.exemple/v1/events";
string queuePath;
Queue<string> queue = new Queue<string>();
string userRawId, userId, sessionId;
readonly string salt = "votre_secret_salt";
void Awake()
{
queuePath = Path.Combine(Application.persistentDataPath, "events.queue.json");
bool consent = PlayerPrefs.GetInt("telemetry_consent", 0) == 1;
if (!consent) return;
userRawId = PlayerPrefs.GetString("uid_raw", Guid.NewGuid().ToString());
PlayerPrefs.SetString("uid_raw", userRawId);
sessionId = Guid.NewGuid().ToString();
userId = ComputeHmac(userRawId, salt);
LoadQueue();
Enqueue(CreateEvent("session_start"));
Application.quitting += OnAppQuit;
Application.pauseStateChanged += state =>
{
if (state == PauseState.Paused) StartCoroutine(FlushCoroutine());
};
InvokeRepeating(nameof(PeriodicFlush), 5f, 5f);
}
string ComputeHmac(string data, string key)
{
using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
return Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(data)));
}
string CreateEvent(string name, object attrs = null)
{
var ev = new
{
event_id = Guid.NewGuid().ToString(),
event_name = name,
occurred_at = DateTime.UtcNow.ToString("o"),
received_at = (string)null,
user_id = userId,
session_id = sessionId,
build_id = Application.version,
platform = Application.platform.ToString().ToLower(),
mechanic_version = "v1",
attributes = attrs
};
return JsonConvert.SerializeObject(ev);
}
void Enqueue(string payload)
{
queue.Enqueue(payload);
SaveQueue();
}
void LoadQueue()
{
if (!File.Exists(queuePath)) return;
var list = JsonConvert.DeserializeObject<List<string>>(File.ReadAllText(queuePath));
foreach (var e in list) queue.Enqueue(e);
}
void SaveQueue()
{
var tmp = queuePath + ".tmp";
File.WriteAllText(tmp, JsonConvert.SerializeObject(queue.ToArray()));
if (File.Exists(queuePath)) File.Delete(queuePath);
File.Move(tmp, queuePath);
}
void PeriodicFlush()
{
if (queue.Count == 0) return;
StartCoroutine(FlushCoroutine());
}
System.Collections.IEnumerator FlushCoroutine()
{
var batch = new { events = queue.ToArray() };
var json = JsonConvert.SerializeObject(batch);
using var req = new UnityWebRequest(ENDPOINT, "POST")
{
uploadHandler = new UploadHandlerRaw(Encoding.UTF8.GetBytes(json)),
downloadHandler = new DownloadHandlerBuffer()
};
req.SetRequestHeader("Content-Type", "application/json");
yield return req.SendWebRequest();
if (req.result == UnityWebRequest.Result.Success)
{
queue.Clear();
if (File.Exists(queuePath)) File.Delete(queuePath);
}
else
{
// back-off géré par coroutine caller (5s + taux d’erreur)
}
}
void OnAppQuit()
{
Enqueue(CreateEvent("session_end"));
SaveQueue();
}
}
3) Ingestion AWS (Terraform)
Ce pipeline crée : Kinesis Data Stream, Firehose → S3 Parquet, DLQ, KMS, Glue Schema Registry et Lambda de validation.

provider "aws" { region = "us-east-1" }
# KMS pour chiffrement
resource "aws_kms_key" "firehose" {
description = "KMS Firehose"
}
# S3 bucket chiffré
resource "aws_s3_bucket" "events" {
bucket = "game-events-lake"
server_side_encryption_configuration {
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
kms_master_key_id = aws_kms_key.firehose.key_id
}
}
}
}
# DLQ pour Firehose
resource "aws_sqs_queue" "dlq" {
name = "firehose-dlq"
message_retention_seconds = 1209600
}
# Rôle IAM Firehose
data "aws_iam_policy_document" "firehose_assume" {
statement {
effect = "Allow"
principals { type = "Service" identifiers = ["firehose.amazonaws.com"] }
actions = ["sts:AssumeRole"]
}
}
resource "aws_iam_role" "firehose_role" {
assume_role_policy = data.aws_iam_policy_document.firehose_assume.json
}
data "aws_iam_policy_document" "firehose_policy" {
statement {
effect = "Allow"
actions = [
"s3:PutObject","s3:GetBucketLocation","s3:AbortMultipartUpload",
"kms:Encrypt","kms:GenerateDataKey"
]
resources = ["${aws_s3_bucket.events.arn}/*", aws_kms_key.firehose.arn]
}
statement {
effect = "Allow"
actions = ["kinesis:PutRecord","kinesis:PutRecords"]
resources = [aws_kinesis_stream.events.arn]
}
}
resource "aws_iam_role_policy" "firehose_attach" {
role = aws_iam_role.firehose_role.id
policy = data.aws_iam_policy_document.firehose_policy.json
}
# Kinesis Data Stream
resource "aws_kinesis_stream" "events" {
name = "game-events-stream"
shard_count = 2
retention_period = 24
}
# Firehose Delivery Stream
resource "aws_kinesis_firehose_delivery_stream" "to_s3" {
name = "events-to-s3"
destination = "extended_s3"
extended_s3_configuration {
bucket_arn = aws_s3_bucket.events.arn
role_arn = aws_iam_role.firehose_role.arn
buffer_size = 64
buffer_interval = 60
compression_format = "PARQUET"
error_output_prefix = "errors/!{firehose:error-output-type}/"
data_format_conversion_configuration {
enabled = true
input_format_configuration {
deserializer { hive_json_ser_de {} }
}
output_format_configuration {
serializer { parquet_ser_de {} }
}
}
dynamic_partitioning_configuration { enabled = true }
processing_configuration {
enabled = true
processors {
type = "Lambda"
parameters {
parameter_name = "LambdaArn"
parameter_value = aws_lambda_function.validate_schema.arn
}
}
}
s3_backup_mode = "FailedDataOnly"
error_output_prefix = "dlq/"
s3_backup_configuration {
role_arn = aws_iam_role.firehose_role.arn
bucket_arn = aws_s3_bucket.events.arn
}
}
kinesis_source_configuration {
kinesis_stream_arn = aws_kinesis_stream.events.arn
role_arn = aws_iam_role.firehose_role.arn
}
cloudwatch_logging_options {
enabled = true
log_group_name = "/aws/kinesisfirehose/events"
log_stream_name = "to-s3"
}
}
# Glue Schema Registry
resource "aws_glue_registry" "events" {
name = "game-events-registry"
description = "Registry des schemas d’événements"
}
resource "aws_glue_schema" "event_schema" {
name = "game_event_schema"
data_format = "JSON"
compatibility = "BACKWARD"
registry_id = aws_glue_registry.events.id
schema_definition = file("schemas/game_event_1_0_0.json")
}
# Lambda de validation
data "aws_iam_policy_document" "lambda_assume" {
statement {
effect = "Allow"
principals { type = "Service" identifiers = ["lambda.amazonaws.com"] }
actions = ["sts:AssumeRole"]
}
}
resource "aws_iam_role" "lambda_role" {
assume_role_policy = data.aws_iam_policy_document.lambda_assume.json
}
resource "aws_iam_role_policy" "lambda_policy" {
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = ["glue:GetSchemaVersion","kms:Decrypt"],
Resource = ["${aws_glue_schema.event_schema.arn}*","${aws_kms_key.firehose.arn}"]
}
]
})
}
resource "aws_lambda_function" "validate_schema" {
function_name = "validate_game_event"
filename = "validate_schema.zip"
handler = "index.handler"
runtime = "nodejs18.x"
role = aws_iam_role.lambda_role.arn
environment { variables = { SCHEMA_REGISTRY = aws_glue_registry.events.name } }
}
4) Traitement en streaming
- Stack : Apache Flink/Kinesis Data Analytics en mode exactly-once.
- Checkpointing sur S3 ou DFS (RocksDB state backend), intervalle 10 s.
- Watermarking pour gérer le lateness (ex. 2 min).
- Déduplication sur
event_id: fenêtre tumbling 24 h, TTL > 24 h. - Options de stockage : state TTL, taille du state store, Bloom filter pour réduire la mémoire (false positive trade-off).
- Enrichissement géo/IP via MaxMind en UDF ou job parallèle.
5) Requêtes analytiques SQL
Exemple Retention J+1/J+7 :
WITH starts AS (
SELECT user_id, DATE(occurred_at) AS day0
FROM events WHERE event_name='session_start'
),
retention AS (
SELECT
DATE_DIFF(DATE(e.occurred_at), s.day0, DAY) AS day_diff,
COUNT(DISTINCT s.user_id) AS users
FROM events e
JOIN starts s USING (user_id)
WHERE DATE_DIFF(DATE(e.occurred_at), s.day0, DAY) IN (1,7)
GROUP BY day_diff
)
SELECT * FROM retention;
6) A/B testing et statistiques
– Configurez un flag 50/50 avec votre solution de feature flags.
– Test deux-côtés (two-sided) par défaut, one-side si hypothèse directionnelle.
– Pour un uplift de 35 %→37 %, α=0.05, puissance=0.8 :
from statsmodels.stats.power import NormalIndPower
power, alpha = 0.8, 0.05
p0, p1 = 0.35, 0.37
analysis = NormalIndPower()
n = analysis.solve_power(
effect_size=analysis.proportion_effectsize(p1, p0),
power=power, alpha=alpha, ratio=1
)
print(int(n)) # utilisateurs par variante
– Sequential testing : utilisez alpha spending (libraries comme seqdesign ou alphaspending), définissez stopping rules avant démarrage.
– Ajustements pour KPIs multiples : Bonferroni, FDR.

7) Observabilité, sécurité et RGPD
- Instrumentation OpenTelemetry (trace client→collector, corrélation par
event_id). - Masquage PII avec AWS Macie ou Lambda DLP sur S3.
- Consentement persistant, révocable, horodaté.
- Purge/anonymisation automatique avec Lambda + EventBridge selon délais légaux.
- Monitoring pipeline : latence collector, lag Kinesis, taux erreurs/DLQ, coûts €/1 000 événements.
Vous disposez maintenant d’un pipeline scalable, fault-tolerant et conforme RGPD, prêt à supporter vos mécaniques coeur et vos expérimentations A/B.
Damien Larquey
Author at Codolie
Passionate about technology, innovation, and sharing knowledge with the developer community.