Ai

Pipeline télémétrie et A/B testing complet

Damien LarqueyDamien Larquey
February 23, 2026
7 min read
Pipeline télémétrie et A/B testing complet

Ce guide en 60–120 minutes vous fait découvrir la mise en place d’un pipeline de télémétrie et d’A/B testing :

  • Instrumentation client Unity/Unreal avec file persistante typée, HMAC, back-off exponentiel et flush à la pause/quittance.
  • Ingestion AWS (HTTP collector → Kinesis Data Streams → Kinesis Firehose → S3 Parquet) avec chiffrement KMS, DLQ, Glue Schema Registry et conversion Parquet.
  • Traitement streaming (Apache Flink/Kinesis Data Analytics) en exactly-once : checkpointing, état backend, watermark.
  • Requêtes SQL pour rétention, funnel, monétisation sur BigQuery/Snowflake/Athena+Glue.
  • Feature flags & A/B testing contrôlé : randomisation, sequential testing (alpha spending), corrections multiples.
  • Observabilité (OpenTelemetry), sécurité (DLP, PII masking) et cycle RGPD (consentement, purge, anonymisation).

Prérequis

  • Unity 2021+ ou Unreal 5+
  • Compte AWS avec droits Terraform pour IAM, KMS, Kinesis, Firehose, S3, SQS, Lambda, Glue
  • Entrepôt SQL : BigQuery, Snowflake ou S3+Athena/Glue
  • Solution de feature flags (LaunchDarkly, Unleash, OpenFeature)
  • Outil d’expérimentation (alpha spending, sequential, bayésien)

1) Schéma d’événements versionné

Définissez un JSON Schema versionné (Avro/Parquet), référez-le dans Glue Schema Registry ou un catalogue (DataHub/Amundsen) :

{
  "$schema": "https://exemple.com/schemas/game_event/1-0-0.json",
  "type": "object",
  "required": ["event_id","event_name","occurred_at","user_id","session_id"],
  "properties": {
    "event_id":         {"type":"string","format":"uuid"},
    "event_name":       {"type":"string"},
    "occurred_at":      {"type":"string","format":"date-time"},
    "received_at":      {"type":"string","format":"date-time"},
    "user_id":          {"type":"string"},
    "session_id":       {"type":"string"},
    "build_id":         {"type":"string"},
    "platform":         {"type":"string","enum":["ios","android","pc","console"]},
    "mechanic_version": {"type":"string"},
    "attributes":       {"type":"object","additionalProperties":true}
  }
}

Veillez à limiter la cardinalité de attributes et à gérer les évolutions par compatibilité.

2) Instrumentation Unity (C#)

Exemple de TelemetryManager robuste : file typée, sérialisation/désérialisation sûre, coroutine Unity pour HTTP.

Business leaders and game designers collaborating on core gameplay mechanics and strategy.
Business leaders and game designers collaborating on core gameplay mechanics and strategy.
using System;
using System.IO;
using System.Collections.Generic;
using UnityEngine;
using UnityEngine.Networking;
using Newtonsoft.Json;
using System.Security.Cryptography;
using System.Text;

public class TelemetryManager : MonoBehaviour
{
  const string ENDPOINT = "https://collector.exemple/v1/events";
  string queuePath;
  Queue<string> queue = new Queue<string>();
  string userRawId, userId, sessionId;
  readonly string salt = "votre_secret_salt";

  void Awake()
  {
    queuePath = Path.Combine(Application.persistentDataPath, "events.queue.json");
    bool consent = PlayerPrefs.GetInt("telemetry_consent", 0) == 1;
    if (!consent) return;

    userRawId = PlayerPrefs.GetString("uid_raw", Guid.NewGuid().ToString());
    PlayerPrefs.SetString("uid_raw", userRawId);
    sessionId = Guid.NewGuid().ToString();
    userId = ComputeHmac(userRawId, salt);

    LoadQueue();
    Enqueue(CreateEvent("session_start"));
    Application.quitting += OnAppQuit;
    Application.pauseStateChanged += state =>
    {
      if (state == PauseState.Paused) StartCoroutine(FlushCoroutine());
    };
    InvokeRepeating(nameof(PeriodicFlush), 5f, 5f);
  }

  string ComputeHmac(string data, string key)
  {
    using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
    return Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(data)));
  }

  string CreateEvent(string name, object attrs = null)
  {
    var ev = new
    {
      event_id = Guid.NewGuid().ToString(),
      event_name = name,
      occurred_at = DateTime.UtcNow.ToString("o"),
      received_at = (string)null,
      user_id = userId,
      session_id = sessionId,
      build_id = Application.version,
      platform = Application.platform.ToString().ToLower(),
      mechanic_version = "v1",
      attributes = attrs
    };
    return JsonConvert.SerializeObject(ev);
  }

  void Enqueue(string payload)
  {
    queue.Enqueue(payload);
    SaveQueue();
  }

  void LoadQueue()
  {
    if (!File.Exists(queuePath)) return;
    var list = JsonConvert.DeserializeObject<List<string>>(File.ReadAllText(queuePath));
    foreach (var e in list) queue.Enqueue(e);
  }

  void SaveQueue()
  {
    var tmp = queuePath + ".tmp";
    File.WriteAllText(tmp, JsonConvert.SerializeObject(queue.ToArray()));
    if (File.Exists(queuePath)) File.Delete(queuePath);
    File.Move(tmp, queuePath);
  }

  void PeriodicFlush()
  {
    if (queue.Count == 0) return;
    StartCoroutine(FlushCoroutine());
  }

  System.Collections.IEnumerator FlushCoroutine()
  {
    var batch = new { events = queue.ToArray() };
    var json = JsonConvert.SerializeObject(batch);
    using var req = new UnityWebRequest(ENDPOINT, "POST")
    {
      uploadHandler = new UploadHandlerRaw(Encoding.UTF8.GetBytes(json)),
      downloadHandler = new DownloadHandlerBuffer()
    };
    req.SetRequestHeader("Content-Type", "application/json");
    yield return req.SendWebRequest();

    if (req.result == UnityWebRequest.Result.Success)
    {
      queue.Clear();
      if (File.Exists(queuePath)) File.Delete(queuePath);
    }
    else
    {
      // back-off géré par coroutine caller (5s + taux d’erreur)
    }
  }

  void OnAppQuit()
  {
    Enqueue(CreateEvent("session_end"));
    SaveQueue();
  }
}

3) Ingestion AWS (Terraform)

Ce pipeline crée : Kinesis Data Stream, Firehose → S3 Parquet, DLQ, KMS, Glue Schema Registry et Lambda de validation.

Visual representation of the strategic phases for overseeing core gameplay mechanics.
Visual representation of the strategic phases for overseeing core gameplay mechanics.
provider "aws" { region = "us-east-1" }

# KMS pour chiffrement
resource "aws_kms_key" "firehose" {
  description = "KMS Firehose"
}

# S3 bucket chiffré
resource "aws_s3_bucket" "events" {
  bucket = "game-events-lake"
  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm     = "aws:kms"
        kms_master_key_id = aws_kms_key.firehose.key_id
      }
    }
  }
}

# DLQ pour Firehose
resource "aws_sqs_queue" "dlq" {
  name                      = "firehose-dlq"
  message_retention_seconds = 1209600
}

# Rôle IAM Firehose
data "aws_iam_policy_document" "firehose_assume" {
  statement {
    effect = "Allow"
    principals { type = "Service" identifiers = ["firehose.amazonaws.com"] }
    actions   = ["sts:AssumeRole"]
  }
}
resource "aws_iam_role" "firehose_role" {
  assume_role_policy = data.aws_iam_policy_document.firehose_assume.json
}
data "aws_iam_policy_document" "firehose_policy" {
  statement {
    effect = "Allow"
    actions = [
      "s3:PutObject","s3:GetBucketLocation","s3:AbortMultipartUpload",
      "kms:Encrypt","kms:GenerateDataKey"
    ]
    resources = ["${aws_s3_bucket.events.arn}/*", aws_kms_key.firehose.arn]
  }
  statement {
    effect = "Allow"
    actions = ["kinesis:PutRecord","kinesis:PutRecords"]
    resources = [aws_kinesis_stream.events.arn]
  }
}
resource "aws_iam_role_policy" "firehose_attach" {
  role   = aws_iam_role.firehose_role.id
  policy = data.aws_iam_policy_document.firehose_policy.json
}

# Kinesis Data Stream
resource "aws_kinesis_stream" "events" {
  name             = "game-events-stream"
  shard_count      = 2
  retention_period = 24
}

# Firehose Delivery Stream
resource "aws_kinesis_firehose_delivery_stream" "to_s3" {
  name        = "events-to-s3"
  destination = "extended_s3"

  extended_s3_configuration {
    bucket_arn               = aws_s3_bucket.events.arn
    role_arn                 = aws_iam_role.firehose_role.arn
    buffer_size              = 64
    buffer_interval          = 60
    compression_format       = "PARQUET"
    error_output_prefix      = "errors/!{firehose:error-output-type}/"
    data_format_conversion_configuration {
      enabled = true
      input_format_configuration {
        deserializer { hive_json_ser_de {} }
      }
      output_format_configuration {
        serializer { parquet_ser_de {} }
      }
    }
    dynamic_partitioning_configuration { enabled = true }
    processing_configuration {
      enabled = true
      processors {
        type = "Lambda"
        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = aws_lambda_function.validate_schema.arn
        }
      }
    }
    s3_backup_mode = "FailedDataOnly"
    error_output_prefix = "dlq/"
    s3_backup_configuration {
      role_arn   = aws_iam_role.firehose_role.arn
      bucket_arn = aws_s3_bucket.events.arn
    }
  }
  kinesis_source_configuration {
    kinesis_stream_arn = aws_kinesis_stream.events.arn
    role_arn           = aws_iam_role.firehose_role.arn
  }
  cloudwatch_logging_options {
    enabled         = true
    log_group_name  = "/aws/kinesisfirehose/events"
    log_stream_name = "to-s3"
  }
}

# Glue Schema Registry
resource "aws_glue_registry" "events" {
  name        = "game-events-registry"
  description = "Registry des schemas d’événements"
}
resource "aws_glue_schema" "event_schema" {
  name              = "game_event_schema"
  data_format       = "JSON"
  compatibility     = "BACKWARD"
  registry_id       = aws_glue_registry.events.id
  schema_definition = file("schemas/game_event_1_0_0.json")
}

# Lambda de validation
data "aws_iam_policy_document" "lambda_assume" {
  statement {
    effect    = "Allow"
    principals { type = "Service" identifiers = ["lambda.amazonaws.com"] }
    actions   = ["sts:AssumeRole"]
  }
}
resource "aws_iam_role" "lambda_role" {
  assume_role_policy = data.aws_iam_policy_document.lambda_assume.json
}
resource "aws_iam_role_policy" "lambda_policy" {
  role = aws_iam_role.lambda_role.id
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect   = "Allow",
        Action   = ["glue:GetSchemaVersion","kms:Decrypt"],
        Resource = ["${aws_glue_schema.event_schema.arn}*","${aws_kms_key.firehose.arn}"]
      }
    ]
  })
}
resource "aws_lambda_function" "validate_schema" {
  function_name = "validate_game_event"
  filename      = "validate_schema.zip"
  handler       = "index.handler"
  runtime       = "nodejs18.x"
  role          = aws_iam_role.lambda_role.arn
  environment { variables = { SCHEMA_REGISTRY = aws_glue_registry.events.name } }
}

4) Traitement en streaming

  • Stack : Apache Flink/Kinesis Data Analytics en mode exactly-once.
  • Checkpointing sur S3 ou DFS (RocksDB state backend), intervalle 10 s.
  • Watermarking pour gérer le lateness (ex. 2 min).
  • Déduplication sur event_id : fenêtre tumbling 24 h, TTL > 24 h.
  • Options de stockage : state TTL, taille du state store, Bloom filter pour réduire la mémoire (false positive trade-off).
  • Enrichissement géo/IP via MaxMind en UDF ou job parallèle.

5) Requêtes analytiques SQL

Exemple Retention J+1/J+7 :

WITH starts AS (
  SELECT user_id, DATE(occurred_at) AS day0
  FROM events WHERE event_name='session_start'
),
retention AS (
  SELECT
    DATE_DIFF(DATE(e.occurred_at), s.day0, DAY) AS day_diff,
    COUNT(DISTINCT s.user_id) AS users
  FROM events e
  JOIN starts s USING (user_id)
  WHERE DATE_DIFF(DATE(e.occurred_at), s.day0, DAY) IN (1,7)
  GROUP BY day_diff
)
SELECT * FROM retention;

6) A/B testing et statistiques

– Configurez un flag 50/50 avec votre solution de feature flags.
– Test deux-côtés (two-sided) par défaut, one-side si hypothèse directionnelle.
– Pour un uplift de 35 %→37 %, α=0.05, puissance=0.8 :

from statsmodels.stats.power import NormalIndPower
power, alpha = 0.8, 0.05
p0, p1 = 0.35, 0.37
analysis = NormalIndPower()
n = analysis.solve_power(
  effect_size=analysis.proportion_effectsize(p1, p0),
  power=power, alpha=alpha, ratio=1
)
print(int(n))  # utilisateurs par variante

– Sequential testing : utilisez alpha spending (libraries comme seqdesign ou alphaspending), définissez stopping rules avant démarrage.
– Ajustements pour KPIs multiples : Bonferroni, FDR.

How core gameplay mechanics translate into measurable business outcomes.
How core gameplay mechanics translate into measurable business outcomes.

7) Observabilité, sécurité et RGPD

  • Instrumentation OpenTelemetry (trace client→collector, corrélation par event_id).
  • Masquage PII avec AWS Macie ou Lambda DLP sur S3.
  • Consentement persistant, révocable, horodaté.
  • Purge/anonymisation automatique avec Lambda + EventBridge selon délais légaux.
  • Monitoring pipeline : latence collector, lag Kinesis, taux erreurs/DLQ, coûts €/1 000 événements.

Vous disposez maintenant d’un pipeline scalable, fault-tolerant et conforme RGPD, prêt à supporter vos mécaniques coeur et vos expérimentations A/B.

Damien Larquey

Damien Larquey

Author at Codolie

Passionate about technology, innovation, and sharing knowledge with the developer community.

Back to Blog