Back to Insights
Data Engineering 11/26/2024 5 min read

Replaying Lost GA4 Data: Building a Serverless Backfill Pipeline from BigQuery Raw Events with Cloud Run

Replaying Lost GA4 Data: Building a Serverless Backfill Pipeline from BigQuery Raw Events with Cloud Run

You've invested significant effort into building a sophisticated server-side Google Analytics 4 (GA4) pipeline. Your Google Tag Manager (GTM) Server Container, hosted on Cloud Run, acts as the central hub for data collection, enrichment, transformation, and granular consent management. Furthermore, you've established a raw event data lake in BigQuery, capturing every server-side event in its pristine form.

This architecture provides unparalleled control, accuracy, and compliance, forming the backbone of your modern analytics strategy. However, even the most robust systems can encounter issues:

  • Temporary Outages: GA4 or other downstream platforms might experience brief outages, causing events to be lost during the downtime.
  • Misconfigurations: A faulty GTM Server Container custom template or tag might accidentally prevent certain GA4 events from firing correctly for a period.
  • New Requirements: A new GA4 property or a fresh analytics tool needs historical data that wasn't previously sent, or you discover a new valuable dimension you wish you had sent previously.
  • Data Re-enrichment/Correction: You discover an error in your server-side enrichment logic (e.g., incorrect customer_segment assignment) and need to re-process historical events with the corrected logic.

The core problem is the need for a mechanism to replay or backfill lost or incorrectly processed GA4 events from your raw server-side data lake. Relying solely on client-side tracking for recovery is often impossible for historical data, leaving gaps in your analytics and compromising data integrity.

The Problem: When Your Data Pipeline Hits a Snag

Even with a comprehensive server-side setup, the path from user interaction to GA4 insights isn't always perfectly smooth. Consider these scenarios:

  1. GA4 Measurement Protocol Failures: Your GTM Server Container successfully processes an event, but the HTTP call to the GA4 Measurement Protocol fails (e.g., due to a temporary GA4 API issue or network timeout). The event is logged in your raw data lake, but it never reaches GA4.
  2. GTM SC Logic Errors: A bug in your custom GTM SC template for a purchase event causes the GA4 tag to fire incorrectly or not at all for a few hours. Your raw data lake has the correct incoming events, but GA4 is missing conversions.
  3. New Data Requirements: You decide to start tracking a new custom_dimension in GA4 for all historical page_view events. The data existed in your raw payloads, but wasn't mapped.

In all these cases, you have the "source of truth" in your BigQuery raw event data lake. The challenge is effectively extracting this data, applying the necessary (and potentially corrected) transformations, and reliably sending it to GA4 as if it were a new event.

The Solution: A Serverless GA4 Event Replay Pipeline

Our solution leverages the immutable record of your raw server-side events in BigQuery to build a serverless, event-driven replay pipeline. This pipeline will read historical raw events, reconstruct valid GA4 Measurement Protocol hits, and send them to your GA4 property, filling in data gaps and correcting past errors.

This approach provides:

  • Data Integrity: Recover lost events, ensuring a complete historical record in GA4.
  • Flexibility: Re-process events with updated logic or for new tracking requirements.
  • Resilience: Your raw data lake acts as a robust backup, enabling recovery from downstream system failures.
  • Serverless Efficiency: Utilize Cloud Run for event processing and Pub/Sub for scalable orchestration.

Architecture: From Raw Data Lake to GA4 Revival

The replay pipeline will consist of BigQuery as the source of truth, Pub/Sub for message brokering and batching, and a Cloud Run service to act as the "Replay Engine," which reconstructs and dispatches events to GA4.

graph TD
    subgraph Raw Data & Orchestration
        A[BigQuery Raw Event Data Lake] -->|1. Query Historical Events| B(BigQuery Query Service);
        B -->|2. Export/Stream Query Results| C(Cloud Scheduler/Dataflow Job);
        C -->|3. Publish Raw Event Payload to Pub/Sub| D(Google Cloud Pub/Sub Topic);
    end

    subgraph Replay Engine
        D -->|4. Pub/Sub Push Subscription| E(GA4 Event Replay Service on Cloud Run);
        E -->|5. Extract/Transform Raw Payload| F{Python Logic: Reconstruct GA4 MP Hit};
        F -->|6. Send to GA4 Measurement Protocol| G[Google Analytics 4];
    end

    G --> H[GA4 Reports & Explorations];

Key Flow:

  1. Identify Missing/Incorrect Data: You determine a time range and criteria for events that need replaying (e.g., event_name = 'purchase' during a specific outage, or all page_view events that need a new custom dimension).
  2. Query BigQuery: A BigQuery query selects the raw event payloads (from your raw_incoming_events table) that match your criteria.
  3. Orchestration:
    • For smaller batches, you might manually export the query results or trigger a Cloud Run service directly.
    • For larger or recurring backfills, a Cloud Scheduler job can trigger a Dataflow job or publish messages to a Pub/Sub topic, where each message contains a raw event's payload or a reference to it.
  4. Pub/Sub & Cloud Run: We'll use Pub/Sub to pass each raw event payload to a dedicated GA4 Event Replay Service (Cloud Run).
  5. Replay Engine (Cloud Run): This Python service receives a raw event, extracts the necessary fields (e.g., event_name, timestamp, client_id, value, items), constructs a valid GA4 Measurement Protocol payload, and dispatches it to GA4.
  6. GA4 Ingestion: GA4 receives the replayed events, processing them based on the provided timestamp and event_id for accurate historical placement and deduplication.

Core Components Deep Dive & Implementation Steps

1. BigQuery: Your Source of Truth

Ensure your raw_incoming_events table in BigQuery (from the data lake blog) contains the full payload as a JSON or STRING type, along with a processed_event_id (STRING) column for deduplication.

Example BigQuery Query to Select Raw Events for Replay:

Let's assume we need to replay purchase events that happened on a specific day where GA4 data was missing, and we want to ensure they include a specific user_id if available in the raw payload.

SELECT
    raw.processed_event_id,
    raw.event_timestamp,
    JSON_EXTRACT(raw.payload, '$') AS raw_payload_json -- Extract full JSON object
    -- You can also extract specific fields if your replay service needs them pre-parsed
    -- JSON_EXTRACT_SCALAR(raw.payload, '$.event_name') AS event_name_from_raw,
    -- JSON_EXTRACT_SCALAR(raw.payload, '$.transaction_id') AS transaction_id_from_raw,
    -- JSON_EXTRACT_SCALAR(raw.payload, '$.user_id') AS user_id_from_raw
FROM
    `your_gcp_project.raw_events_data_lake.raw_incoming_events` AS raw
WHERE
    DATE(raw.event_timestamp) = '2024-07-15' -- Target specific day of outage/misconfiguration
    AND JSON_EXTRACT_SCALAR(raw.payload, '$.event_name') = 'purchase' -- Filter for specific event type
    -- Optional: Further filter criteria based on your specific use case
    -- AND NOT EXISTS (SELECT 1 FROM `your_gcp_project.analytics_YOUR_GA4_PROPERTY_ID.events_*` AS ga4 WHERE ga4._TABLE_SUFFIX = '20240715' AND (SELECT value.string_value FROM UNNEST(ga4.event_params) WHERE key = '_eid') = raw.processed_event_id)
    -- The above line attempts to find events in GA4 export that match the raw event, filtering out those already present.
    -- This can be complex and expensive; usually, relying on GA4's _eid deduplication is simpler for replays.
;

This query will give you the processed_event_id, event_timestamp, and the raw_payload_json for each event you want to replay.

2. Google Cloud Pub/Sub Setup

Create a Pub/Sub topic to enqueue your raw event payloads for the replay service.

gcloud pubsub topics create ga4-replay-events-topic --project YOUR_GCP_PROJECT_ID

3. GA4 Event Replay Service (Cloud Run - Python)

This Cloud Run service will receive messages from Pub/Sub, reconstruct the GA4 Measurement Protocol payload, and send it to GA4.

replay-service/main.py:

import os
import json
import base64
from flask import Flask, request, jsonify
import logging
import requests
import datetime
import pytz

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# GA4 Measurement Protocol Configuration
GA4_API_SECRET = os.environ.get('GA4_API_SECRET') # Your GA4 Measurement Protocol API Secret
GA4_MEASUREMENT_ID = os.environ.get('GA4_MEASUREMENT_ID') # Your GA4 Measurement ID (G-XXXXXXXXXX)
GA4_ENDPOINT = f"https://www.google-analytics.com/mp/collect?measurement_id={GA4_MEASUREMENT_ID}&api_secret={GA4_API_SECRET}"
DEBUG_GA4_ENDPOINT = f"https://www.google-analytics.com/debug/mp/collect?measurement_id={GA4_MEASUREMENT_ID}&api_secret={GA4_API_SECRET}"

@app.route('/replay-event', methods=['POST'])
def replay_event():
    """
    Receives Pub/Sub push messages, decodes them, reconstructs GA4 Measurement Protocol hit,
    and sends to GA4.
    """
    if not request.is_json:
        logger.warning("Replay Service: Request is not JSON. Content-Type: %s", request.headers.get('Content-Type'))
        return jsonify({'error': 'Request must be JSON'}), 400

    try:
        envelope = request.get_json()
        if not envelope or 'message' not in envelope:
            logger.error("Replay Service: Invalid Pub/Sub message format.")
            return jsonify({'error': 'Invalid Pub/Sub message format'}), 400

        message = envelope['message']
        if 'data' not in message:
            logger.error("Replay Service: Pub/Sub message data missing.")
            return jsonify({'error': 'Pub/Sub message data missing'}), 400

        # Pub/Sub message data is base64 encoded
        decoded_data = base64.b64decode(message['data']).decode('utf-8')
        raw_event_data_dict = json.loads(decoded_data)
        
        # Extract the BigQuery row's relevant fields
        unified_event_id = raw_event_data_dict.get('processed_event_id')
        event_timestamp_str = raw_event_data_dict.get('event_timestamp') # Should be ISO format
        raw_payload_json_str = raw_event_data_dict.get('raw_payload_json')

        if not unified_event_id or not event_timestamp_str or not raw_payload_json_str:
            logger.error(f"Replay Service: Missing critical fields in message for {unified_event_id}. Skipping.")
            return jsonify({'error': 'Missing critical fields in message'}), 400

        # Parse the raw_payload_json back into a dictionary
        raw_payload = json.loads(raw_payload_json_str)

        # --- Reconstruct GA4 Measurement Protocol Payload ---
        # This part mimics how your GTM Server Container would build a GA4 MP hit.
        # You'll need to adapt this logic to match your specific GTM SC transformations.
        # This is the most critical part where you "replay" your GTM SC logic.

        event_name_from_payload = raw_payload.get('event_name', 'unknown_replayed_event')
        client_id_from_payload = raw_payload.get('_event_metadata', {}).get('client_id', raw_payload.get('client_id'))
        
        if not client_id_from_payload:
            logger.error(f"Replay Service: Cannot determine client_id for {unified_event_id}. Skipping GA4 send.")
            return jsonify({'error': 'Missing client_id'}), 400

        # Convert original event timestamp to milliseconds for _et parameter
        event_timestamp_dt = datetime.datetime.fromisoformat(event_timestamp_str.replace('Z', '+00:00')) # Handle Z for UTC
        event_timestamp_ms = int(event_timestamp_dt.timestamp() * 1000)

        # Build parameters - adapt to your raw_payload structure and desired GA4 mapping
        # Example: assuming 'value', 'currency', 'items' are top-level in raw_payload or nested
        ga4_event_params = {
            "debug_mode": True, # Enable debug_mode for testing backfill
            "engagement_time_msec": 1, # Minimal engagement time for replayed events
            "session_id": f"replay_{event_timestamp_dt.strftime('%Y%m%d%H%M%S')}", # Generate a unique session ID for replay
            "_eid": unified_event_id, # Crucial for deduplication in GA4
            "replayed_event": True, # Custom parameter to identify replayed events in GA4
            "original_timestamp": event_timestamp_ms # Store original timestamp if needed
        }
        
        # Example mappings: Adapt these to your actual raw_payload structure and GA4 expectations
        if 'value' in raw_payload:
            ga4_event_params['value'] = float(raw_payload['value'])
        if 'currency' in raw_payload:
            ga4_event_params['currency'] = raw_payload['currency']
        if 'transaction_id' in raw_payload: # Important for GA4 purchase deduplication
            ga4_event_params['transaction_id'] = raw_payload['transaction_id']
        
        # Handle items array for e-commerce events
        if 'items' in raw_payload and isinstance(raw_payload['items'], list):
            ga4_event_params['items'] = []
            for item in raw_payload['items']:
                ga4_event_params['items'].append({
                    "item_id": item.get("item_id"),
                    "item_name": item.get("item_name"),
                    "price": float(item.get("price", 0)),
                    "quantity": int(item.get("quantity", 1))
                    # Add other item-scoped parameters as needed
                })

        # Add any custom dimensions or user properties from your raw_payload if needed
        # e.g., ga4_event_params['user_loyalty_tier'] = raw_payload.get('user_loyalty_tier')

        ga4_payload = {
            "client_id": client_id_from_payload,
            "timestamp_micros": str(event_timestamp_ms * 1000), # Measurement Protocol expects microseconds
            "events": [
                {
                    "name": event_name_from_payload,
                    "params": ga4_event_params
                }
            ]
        }

        # --- Send to GA4 Measurement Protocol ---
        headers = {'Content-Type': 'application/json'}
        mp_endpoint = DEBUG_GA4_ENDPOINT # Use debug endpoint for initial testing
        # mp_endpoint = GA4_ENDPOINT # Switch to production endpoint when confident

        logger.info(f"Replay Service: Sending event '{event_name_from_payload}' (ID: {unified_event_id}) to GA4 MP...")
        
        response = requests.post(mp_endpoint, headers=headers, data=json.dumps(ga4_payload), timeout=10)
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)

        logger.info(f"Replay Service: Successfully replayed event '{event_name_from_payload}' (ID: {unified_event_id}) to GA4 MP. Status: {response.status_code}")
        if "debug" in mp_endpoint:
            logger.info(f"GA4 Debug Response: {response.json()}")
        
        return jsonify({'status': 'acknowledged', 'event_id': unified_event_id}), 200

    except requests.exceptions.Timeout:
        logger.error(f"Replay Service: Timeout when sending event {unified_event_id} to GA4 MP.", exc_info=True)
        return jsonify({'error': 'GA4 MP send timeout'}), 500
    except requests.exceptions.RequestException as req_e:
        logger.error(f"Replay Service: Error sending event {unified_event_id} to GA4 MP: {req_e}. Response: {req_e.response.text if req_e.response else 'N/A'}", exc_info=True)
        return jsonify({'error': f"GA4 MP send failed: {str(req_e)}"}), 500
    except json.JSONDecodeError as json_e:
        logger.error(f"Replay Service: JSON decoding error for event {unified_event_id}: {json_e}. Raw data: {decoded_data}", exc_info=True)
        return jsonify({'error': 'JSON processing error'}), 500
    except Exception as e:
        logger.error(f"Replay Service: Unexpected error processing event {unified_event_id}: {e}", exc_info=True)
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

replay-service/requirements.txt:

Flask
requests
pytz # For timezone handling if needed, though fromisoformat usually handles Z

Deploy the Replay Service to Cloud Run:

gcloud run deploy ga4-replay-service \
    --source ./replay-service \
    --platform managed \
    --region YOUR_GCP_REGION \
    --no-allow-unauthenticated \
    --set-env-vars \
        GA4_API_SECRET=\"YOUR_GA4_MP_API_SECRET\",\
        GA4_MEASUREMENT_ID=\"G-YOUR_GA4_MEASUREMENT_ID\" \
    --memory 512Mi \
    --cpu 1 \
    --timeout 60s # Allow ample time for processing and API calls

Important:

  • Replace YOUR_GCP_REGION, YOUR_GA4_MP_API_SECRET, and G-YOUR_GA4_MEASUREMENT_ID with your actual values.
  • Security: This service handles sensitive GA4 API credentials. Do NOT use --allow-unauthenticated in production. Instead, use --no-allow-unauthenticated and ensure your Pub/Sub service account (explained below) has roles/run.invoker permission on this Cloud Run service. GA4_API_SECRET should ideally be stored in Secret Manager and retrieved by the service at runtime, rather than directly as an environment variable, for maximum security. For this example, we'll keep it as an ENV var for simplicity in gcloud run deploy.
  • Grant the Cloud Run service identity roles/logging.logWriter to write logs, and roles/secretmanager.secretAccessor if using Secret Manager.

4. Orchestration: Triggering the Replay

For smaller, one-off backfills, you might manually publish messages. For larger or recurring backfills, use Cloud Scheduler and Pub/Sub.

a. Create a Pub/Sub Push Subscription: This subscription will trigger your Cloud Run ga4-replay-service.

gcloud pubsub subscriptions create ga4-replay-subscription \
    --topic ga4-replay-events-topic \
    --push-endpoint=https://ga4-replay-service-YOUR_SERVICE_HASH-YOUR_GCP_REGION.a.run.app/replay-event \
    --ack-deadline=30s \
    --message-retention-duration=7d \
    --min-duration-per-ack=10s \
    --max-duration-per-ack=600s \
    --expiration-period=never \
    --project YOUR_GCP_PROJECT_ID

Important: The Pub/Sub service account ([email protected]) needs roles/run.invoker on the ga4-replay-service to allow Pub/Sub to push messages to it.

b. Method 1: Manual Batch Publishing (for one-off smaller backfills) You can use a Python script or Dataflow for this.

# python_publisher.py
import os
import json
from google.cloud import bigquery, pubsub_v1

# Configuration
PROJECT_ID = "YOUR_GCP_PROJECT_ID"
REPLAY_TOPIC_ID = "ga4-replay-events-topic"
GA4_RAW_EVENTS_TABLE = "`YOUR_GCP_PROJECT_ID.raw_events_data_lake.raw_incoming_events`"
TARGET_DATE = "2024-07-15" # Date for events to replay

# Initialize clients
bq_client = bigquery.Client(project=PROJECT_ID)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, REPLAY_TOPIC_ID)

# Query BigQuery for events
query = f"""
    SELECT
        raw.processed_event_id,
        raw.event_timestamp,
        JSON_EXTRACT(raw.payload, '$') AS raw_payload_json
    FROM
        {GA4_RAW_EVENTS_TABLE} AS raw
    WHERE
        DATE(raw.event_timestamp) = '{TARGET_DATE}'
        AND JSON_EXTRACT_SCALAR(raw.payload, '$.event_name') = 'purchase'
"""
print(f"Executing BigQuery query for {TARGET_DATE} events...")
query_job = bq_client.query(query)
results = query_job.result()

print(f"Found {results.total_rows} events to replay for {TARGET_DATE}.")

# Publish each event to Pub/Sub
for row in results:
    message_data = {
        "processed_event_id": row.processed_event_id,
        "event_timestamp": row.event_timestamp.isoformat(),
        "raw_payload_json": row.raw_payload_json
    }
    # Pub/Sub message data must be bytes
    future = publisher.publish(topic_path, json.dumps(message_data).encode('utf-8'))
    print(f"Published event {row.processed_event_id}: {future.result()}")

print("Finished publishing events.")

Run this script from a secure environment (e.g., Cloud Shell, a trusted VM) with roles/bigquery.dataViewer and roles/pubsub.publisher permissions.

c. Method 2: Cloud Scheduler and Dataflow (for large-scale/recurring backfills) For truly large-scale or automated recurring backfills, use Dataflow to read from BigQuery and publish to Pub/Sub. A Cloud Scheduler job could then trigger this Dataflow pipeline. This is beyond a simple blog post's scope but uses the same core components.

5. GA4 Considerations for Replayed Data

  • _eid for Deduplication: The _eid parameter in the Measurement Protocol payload (unified_event_id in our service) is crucial. GA4 uses this to deduplicate events. If you replay an event with an _eid that GA4 has already seen within its deduplication window (typically 30 minutes), it will be ignored. For backfilling missing data, this is perfect.
  • timestamp_micros: This parameter in the Measurement Protocol allows you to specify the original event timestamp, ensuring the replayed event appears chronologically correct in GA4 reports.
  • session_id: For replayed events, you'll likely want to generate a new session_id (as done in the example with replay_ prefix) to clearly distinguish them from naturally occurring sessions. This helps in analysis.
  • Custom Parameter for Replay: Adding a custom event parameter like replayed_event: true allows you to easily filter and segment reports in GA4 to analyze the impact of your backfills or exclude replayed data from certain views.
  • Debug Mode: Use the debug/mp/collect endpoint (DEBUG_GA4_ENDPOINT) during testing to see a detailed response from GA4, including any validation errors, without affecting your production data.

Benefits of This Serverless Replay Pipeline

  • Data Completeness: Fill in gaps in your GA4 data caused by outages or misconfigurations, ensuring a more complete historical view.
  • Data Correctness: Re-process events with corrected logic, improving the accuracy of historical data.
  • Future-Proofing: Leverage your raw data lake to meet new analytics requirements or populate new systems with historical data.
  • Resilience & Auditability: Your raw data lake serves as an invaluable backup, and the replay pipeline offers a controlled way to correct and re-send data.
  • Flexibility: Target specific event types, date ranges, or apply custom logic during replay based on your needs.
  • Cost-Effective: Serverless components (Cloud Run, Pub/Sub) scale on demand, only incurring costs when actively replaying events.

Important Considerations

  • Cost: BigQuery queries, Pub/Sub messages, and Cloud Run invocations incur costs. Optimize BigQuery queries to scan minimal data. For massive backfills, consider Dataflow for efficient processing.
  • Deduplication Strategy: Rely on GA4's _eid deduplication for replayed events. Do not manually filter in your BigQuery query unless you have very specific, complex requirements that GA4's native deduplication cannot handle.
  • PII Handling: Your raw data lake might contain PII. Ensure your Cloud Run replay service maintains your PII scrubbing and consent policies during replay, just as your original GTM SC does. The example assumes PII is handled according to prior policies (e.g., already hashed, or redacted).
  • Timestamp Accuracy: Ensure the timestamp_micros parameter in your Measurement Protocol payload accurately reflects the original event time to prevent data skewing in GA4.
  • GA4 Property ID: Double-check that you are sending events to the correct GA4 Measurement ID for the desired environment (e.g., dev, staging, prod).
  • Measurement Protocol Limits: Be aware of GA4 Measurement Protocol limits for event volume per property. For very large backfills, you might need to spread them over time.

Conclusion

Your server-side GA4 data lake is more than just an audit trail; it's a powerful asset for data recovery and re-processing. By building a serverless event replay pipeline with BigQuery, Pub/Sub, and Cloud Run, you gain the ability to intelligently recover from data loss, correct historical inaccuracies, and adapt to evolving analytics needs. This strategic capability transforms your analytics from a fragile collection system into a resilient, self-healing, and future-proof data powerhouse, ensuring your business always operates on complete and accurate insights.