The gold standard of MLOps: A deep dive into Vertex AI Model observability (Part 2)

Welcome back to our comprehensive series on End-to-End Machine Learning Operations (MLOps). In Part 1, we established the bedrock of our pipeline: we utilized rigorous experiment tracking to train a champion XGBoost model and seamlessly deployed it to a live, highly available Vertex AI endpoint.

But here is the dark truth of machine learning: models do not crash; they degrade silently.

Before getting started, here is the architecture diagram detailing the Part 2 workflow followed by its chronological sequence:

And here is the sequence diagram describing the chronological workflow:

When a traditional software application encounters a bug, it throws a 500 Internal Server Error, immediately triggering pagers across the engineering department. When an ML model encounters shifted user demographics or changing macroeconomic trends, it doesn’t fail. It simply processes the unfamiliar data and returns highly inaccurate predictions with absolute confidence.

In Part 2, we are going to build the ultimate MLOps safety net: a Continuous Observability & Remediation Engine. We will meticulously explain how to enable petabyte-scale telemetry, establish sliding-window CRON schedules for drift detection, parse complex BigQuery arrays for Root Cause Analysis (RCA), and architect an event-driven Kubeflow pipeline to autonomously self-heal the system.

Let’s dive deep into the engineering.


1. The omniscient eye: Enabling petabyte-scale telemetry

The first law of MLOps is simple: You cannot monitor what you do not measure. Before we can detect drift, we must instruct our live endpoint to capture the actual HTTP prediction requests and outbound responses.

Using the Vertex AI GAPIC (gRPC API) client, we enable a PredictRequestResponseLoggingConfig. By setting sampling_rate=1.0, we instruct the endpoint to capture 100% of live prediction traffic and asynchronously sink it directly into a partitioned BigQuery dataset. This guarantees an immutable audit trail of our model’s behavior in the wild without adding latency to the user’s request.

from google.cloud.aiplatform_v1 import (
    EndpointServiceClient, Endpoint,
    PredictRequestResponseLoggingConfig, BigQueryDestination
)
from google.protobuf import field_mask_pb2

# 1. Initialize GAPIC client & define BigQuery destination
client = EndpointServiceClient(client_options={"api_endpoint": f"{LOCATION}-aiplatform.googleapis.com"})
endpoint_name = f"projects/{PROJECT_ID}/locations/{LOCATION}/endpoints/{ENDPOINT_ID}"
bq_uri = f"bq://{PROJECT_ID}.churn_monitoring.endpoint_logs"

# 2. Configure logging to capture 100% of prediction traffic
logging_config = PredictRequestResponseLoggingConfig(
    enabled=True,
    sampling_rate=1.0, 
    bigquery_destination=BigQueryDestination(output_uri=bq_uri)
)

# 3. Execute the endpoint update using a FieldMask
endpoint_proto = Endpoint(name=endpoint_name, predict_request_response_logging_config=logging_config)
update_mask = field_mask_pb2.FieldMask(paths=["predict_request_response_logging_config"])
client.update_endpoint(endpoint=endpoint_proto, update_mask=update_mask)

print(f"âś… Success! Prediction telemetry will now stream to: {bq_uri}")


2. Setting the tripwires: Model Monitor V2 & CRON schedules

With raw data flowing into BigQuery, we must establish mathematical guardrails. Using the highly advanced Vertex AI Model Monitoring V2 SDK, we define an explicit schema for our expected inputs and set strict statistical divergence thresholds.

The mathematics & the schedule

We configure the monitor to calculate the Jensen-Shannon Divergence for our numerical features. If the live traffic distribution deviates from the baseline by more than 5% (0.05), the system will trigger a high-severity alert.

But when does it check, and what does it compare? To prevent false positives caused by daily seasonality (e.g., traffic looking different at 2 AM versus 2 PM), we use a sliding window CRON schedule.

from vertexai.resources.preview import ml_monitoring

# 1. Construct an explicit Model Schema
feature_names = ["user_age", "session_duration"]
field_schemas = [ml_monitoring.spec.FieldSchema(name=name, data_type="float") for name in feature_names]
custom_model_schema = ml_monitoring.spec.ModelMonitoringSchema(feature_fields=field_schemas)

# 2. Define Feature Drift Specifications
FEATURE_DRIFT_SPEC = ml_monitoring.spec.DataDriftSpec(
    numeric_metric_type="jensen_shannon_divergence",
    feature_alert_thresholds={"user_age": 0.05, "session_duration": 0.05},
)

# 3. Bind a Continuous Hourly Schedule to the Live Endpoint
my_model_monitor.create_schedule(
    display_name="churn-schedule-v2-live",
    cron="0 * * * *", # Execute exactly at minute 0 of every hour
    
    # BASELINE: The 24-hour window from yesterday
    baseline_dataset=ml_monitoring.spec.MonitoringInput(
        endpoints=[ENDPOINT_RESOURCE_NAME], 
        window="24h", 
        offset="24h" # Shift the lookback window to the previous day
    ),
    
    # TARGET: The 24-hour window from today (Live Traffic)
    target_dataset=ml_monitoring.spec.MonitoringInput(
        endpoints=[ENDPOINT_RESOURCE_NAME], 
        window="24h"
    ),
    
    tabular_objective_spec=ml_monitoring.spec.TabularObjective(feature_drift_spec=FEATURE_DRIFT_SPEC),
    notification_spec=ml_monitoring.spec.NotificationSpec(user_emails=["ml-admin@yourcompany.com"])
)

Understanding the CRON logic:
The parameter cron="0 * * * *" tells Vertex AI to wake up at the top of every hour. It looks at the target_dataset (the last 24 hours of traffic) and compares it against the baseline_dataset (which uses offset="24h" to look at the 24 hours prior to the target). This rolling 24-hour comparison ensures the monitor dynamically adapts to safe, gradual shifts while immediately catching sudden, severe anomalies.


3. Injecting chaos: Simulating a data disaster

To test the resilience of our observability engine, we programmatically blast the live endpoint with 1,000 prediction requests.

Crucially, we inject a massive demographic shift. While our original XGBoost model was trained on an audience centered around age 35, we manipulate the live payload generation to simulate a sudden surge of younger users (ages 18-25).

import numpy as np

drifted_instances = []
for _ in range(1000):
    # Simulate the younger demographic causing the drift (Mean age = 22)
    other = np.random.randn(8).tolist()
    instance = {
        "user_age": float(np.random.normal(22, 3)),
        "session_duration": float(np.random.normal(12, 4)),
        "f2": other[0], "f3": other[1], "f4": other[2], "f5": other[3]
        # ... remaining features
    }
    drifted_instances.append(instance)

# Fire the skewed traffic at the live endpoint
for i in range(0, 1000, 100):
    batch_dicts = drifted_instances[i:i+100]
    batch_numeric = [[inst[f] for f in feature_names + ['f2','f3','f4','f5']] for inst in batch_dicts]
    endpoint.predict(instances=batch_numeric)

4. The “pagerduty” moment: The automated alert

When the clock strikes the top of the hour, Vertex AI executes the CRON job, analyzes the BigQuery sink, and calculates the divergence. Recognizing the shift far exceeds our 0.05 threshold, the platform instantly dispatches an automated incident response email.

Here is exactly what that alert looks like when it hits the Data Scientist’s inbox:

:envelope_with_arrow: Subject: Vertex AI Model Monitoring Job anomalies detected

Hello Vertex AI Customer,
You are receiving this mail because you are subscribing to the Vertex AI Model Monitoring service. This mail is to inform you that a model monitoring job has detected anomalies.

Basic Information:

  • Model Monitoring Job: projects/.../locations/us-central1/modelMonitors/...
  • Model: projects/.../models/.../versions/1

Feature Drift Anomalies:

Feature name Anomaly Value Anomaly Threshold Summary
session_duration 0.362824 0.05 The approximate Jensen-Shannon divergence is 0.362824, above the threshold 0.050000.
user_age 0.474969 0.05 The approximate Jensen-Shannon divergence is 0.474969, above the threshold 0.050000.

With an anomaly value nearly 10x our allowable limit, the system has definitively caught the simulated younger demographic.




5. The detective work: Root cause analysis via SQL

An alert tells you what happened, but an engineer needs to know why it happened. Because our endpoint logs all JSON payloads directly to BigQuery, we must parse this data to isolate the failing cohort.

BigQuery stores the Vertex AI telemetry as an ARRAY<STRING>. To query it, we use BigQuery’s powerful UNNEST function to flatten the payload arrays. We use a COALESCE function as a robust safety net: it attempts to extract the feature by its JSON key ($.user_age), but if the payload was sent as a raw numerical array, it falls back to the exact index position ($[0]).

from google.cloud import bigquery
bq_client = bigquery.Client(project=PROJECT_ID)

# SQL Extraction: Robustly handle ARRAY<STRING> schema using UNNEST and COALESCE
sql_extraction = f"""
  SELECT
    CAST(
      COALESCE(
        JSON_EXTRACT_SCALAR(payload, '$.user_age'), -- Attempt to extract via Named Key
        JSON_EXTRACT_SCALAR(payload, '$[0]')        -- Fallback to Index 0 (Numerical Array)
      ) AS FLOAT64
    ) AS user_age
  FROM `{PROJECT_ID}.churn_monitoring.endpoint_logs`,
  UNNEST(request_payload) AS payload 
  WHERE payload IS NOT NULL
"""

live_payload_df = bq_client.query(sql_extraction).to_dataframe().dropna()
print(f"Extracted {len(live_payload_df)} live prediction payloads.")

6. Mathematical verification: SciPy & Seaborn

Before triggering a massive GPU retraining pipeline, a Data Scientist must rigorously verify the anomaly. Using the live BigQuery data we just extracted, we execute a mathematical test directly in our notebook.

Using the Kolmogorov-Smirnov (K-S) test via scipy.stats, we compare the live serving data against our training baseline, visualizing the precise shape-shift with Seaborn.

from scipy.stats import ks_2samp
import matplotlib.pyplot as plt
import seaborn as sns

# 1. Perform 2-Sample Kolmogorov-Smirnov Test
stat, p_value = ks_2samp(training_data, serving_data)

print(f"KS Statistic: {stat:.4f}")
print(f"P-Value: {p_value:.4e}")

# 2. Plot the Distribution Comparison
plt.figure(figsize=(12, 6))
sns.kdeplot(training_data, fill=True, label='Training Baseline (Age)', color='#1f77b4', alpha=0.3, linewidth=2)
sns.kdeplot(serving_data, fill=True, label='Serving Production (Age)', color='#ff7f0e', alpha=0.3, linewidth=2)

plt.title("⚠️ High-Severity Drift Detected: Feature 'user_age'", fontsize=16, fontweight='bold', color='#d62728')
plt.axvline(np.mean(training_data), color='#1f77b4', linestyle='--', label='Training Mean: 35')
plt.axvline(np.mean(serving_data), color='#ff7f0e', linestyle='--', label='Serving Mean: 22')
plt.legend()
plt.show()

Output:

KS Statistic: 0.3890
P-Value: 7.1129e-68
RESULT: Significant drift mathematically proven! (p < 0.05)

This granular statistical evidence eliminates the possibility of a false positive. Because P \ll 0.05, we know definitively that our production customer base has skewed younger. The model simply has not learned the behavioral patterns of this demographic, and an accuracy collapse is imminent.

This GIF shows a Google Cloud Vertex AI monitoring dashboard for a machine learning model, displaying data trends and application performance metrics through several line graphs. (Captioned by AI)

7. The cure: A true self-healing Kubeflow Pipeline

A mature MLOps platform doesn’t just identify issues—it autonomously resolves them.

When Vertex AI detects drift, it publishes an event to a Google Cloud Pub/Sub topic, which triggers a Vertex AI Pipeline (written using the Kubeflow SDK).

To execute this securely in the cloud, we use the @dsl.component decorator. This feature packages our Python functions into isolated Docker containers that execute on Google’s serverless infrastructure. Because these components run in isolated containers, we must explicitly inject our required libraries (like xgboost and google-cloud-bigquery) and pass physical data artifacts between them using Google Cloud Storage.

Here is the exact, production-grade Directed Acyclic Graph (DAG) for our self-healing pipeline:

import time
from google.cloud import aiplatform, storage
from kfp import dsl, compiler

# --- 1. CONFIGURATION & ARTIFACT STORAGE ---
BUCKET_NAME = f"{PROJECT_ID}-vertex-remediation-staging"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root"

# --- 2. DEFINE THE KUBEFLOW COMPONENTS ---

# Step A: Ingest the drifted data from BigQuery
@dsl.component(
    base_image="python:3.10",
    packages_to_install=["google-cloud-bigquery", "pandas", "db-dtypes", "gcsfs"]
)
def ingest_drifted_data(project_id: str, output_csv_path: str) -> str:
    from google.cloud import bigquery
    import pandas as pd
    
    print(f"Connecting to BigQuery in project: {project_id}...")
    client = bigquery.Client(project=project_id)
    
    # Extract the drifted cohort alongside historical data to augment the dataset
    query = f"""
        SELECT 
            CAST(JSON_EXTRACT_SCALAR(payload, '$.user_age') AS FLOAT64) AS user_age,
            CAST(JSON_EXTRACT_SCALAR(payload, '$.session_duration') AS FLOAT64) AS session_duration,
            prediction_score AS target
        FROM `{project_id}.churn_monitoring.endpoint_logs`,
        UNNEST(request_payload) AS payload
        WHERE payload IS NOT NULL
    """
    
    print("Executing extraction query...")
    df = client.query(query).to_dataframe()
    
    # Save the augmented dataset to GCS for the next container to use
    df.to_csv(output_csv_path, index=False)
    return output_csv_path


# Step B: Retrain the champion XGBoost Model
@dsl.component(
    base_image="python:3.10",
    packages_to_install=["xgboost", "pandas", "scikit-learn", "gcsfs", "google-cloud-storage"]
)
def retrain_xgboost_model(data_gcs_path: str, model_output_uri: str) -> dict:
    import pandas as pd
    import xgboost as xgb
    from sklearn.metrics import accuracy_score
    from google.cloud import storage
    
    print("Loading augmented dataset from GCS...")
    df = pd.read_csv(data_gcs_path)
    X = df[['user_age', 'session_duration']]
    y = df['target'].round() # Simplified binary target for demonstration
    
    print("Training XGBoost Classifier on updated demographic data...")
    model = xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss')
    model.fit(X, y)
    
    # Evaluate new accuracy
    preds = model.predict(X)
    new_accuracy = accuracy_score(y, preds)
    print(f"Retraining complete. New Accuracy: {new_accuracy}")
    
    # Save model artifact physically to GCS
    model.save_model("model.bst")
    storage_client = storage.Client()
    bucket_name = model_output_uri.replace("gs://", "").split("/")[0]
    blob_path = "/".join(model_output_uri.replace("gs://", "").split("/")[1:]) + "model.bst"
    
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_path)
    blob.upload_from_filename("model.bst")
    
    return {
        "new_accuracy": float(new_accuracy), 
        "artifact_uri": model_output_uri
    }


# Step C: Evaluate and Dynamically Deploy (THE ZERO-DOWNTIME SWAP)
@dsl.component(
    base_image="python:3.10",
    packages_to_install=["google-cloud-aiplatform"]
)
def evaluate_and_deploy(
    project_id: str, location: str, endpoint_id: str,
    new_accuracy: float, alert_threshold: float, artifact_uri: str
):
    from google.cloud import aiplatform

    if new_accuracy > alert_threshold:
        print(f"âś… Accuracy {new_accuracy} > {alert_threshold}. DEPLOYING...")
        aiplatform.init(project=project_id, location=location)
        
        print("Registering new model artifact to Vertex Model Registry...")
        new_model = aiplatform.Model.upload(
            display_name=f"churn_xgboost_healed",
            artifact_uri=artifact_uri,
            serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest"
        )
        
        print("Shifting live traffic to the newly trained model...")
        endpoint = aiplatform.Endpoint(endpoint_id)
        
        # Hijack the load balancer seamlessly
        endpoint.deploy(
            model=new_model,
            machine_type="n1-standard-4",
            min_replica_count=1, max_replica_count=1,
            traffic_percentage=100  # Instantly routes 100% of live traffic to the healed model
        )
        print("âś… System successfully self-healed. Zero downtime incurred.")
    else:
        print(f"⚠️ Accuracy {new_accuracy} failed to meet threshold. Aborting deployment.")


# --- 3. BUILD THE PIPELINE GRAPH ---
@dsl.pipeline(name="automated-remediation-pipeline")
def auto_remediation_pipeline(
    project_id: str, location: str, endpoint_id: str, alert_threshold: float = 0.80
):
    # Dynamic GCS paths for artifact passing
    staging_dataset = f"gs://{project_id}-vertex-remediation-staging/data/augmented_data.csv"
    staging_model_dir = f"gs://{project_id}-vertex-remediation-staging/models/healed_model/"

    # Step 1: Ingest
    ingest_task = ingest_drifted_data(project_id=project_id, output_csv_path=staging_dataset)
    
    # Step 2: Train (waits for Step 1 to output the CSV path)
    train_task = retrain_xgboost_model(
        data_gcs_path=ingest_task.output, 
        model_output_uri=staging_model_dir
    )
    
    # Step 3: Deploy (waits for Step 2 to return the new accuracy and GCS URI)
    evaluate_and_deploy(
        project_id=project_id,
        location=location,
        endpoint_id=endpoint_id,
        new_accuracy=train_task.outputs["new_accuracy"],
        alert_threshold=alert_threshold,
        artifact_uri=train_task.outputs["artifact_uri"]
    )

# --- 4. COMPILE AND SUBMIT TO VERTEX AI ---
compiler.Compiler().compile(pipeline_func=auto_remediation_pipeline, package_path="remediation_pipeline.yaml")

job = aiplatform.PipelineJob(
    display_name=f"auto-remediation-run-{int(time.time())}",
    template_path="remediation_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "project_id": PROJECT_ID, 
        "location": LOCATION,
        "endpoint_id": ENDPOINT_ID,
        "alert_threshold": 0.80
    }
)
job.submit()

Why this is the undisputed “Gold Standard”:

  1. Isolated Container Logic: By adding packages_to_install to every @dsl.component, we prove to MLOps engineers that we understand these functions run on completely isolated Google Cloud serverless instances, not local machines.
  2. Physical Data Handoffs: Instead of passing meaningless string messages, Step 1 physically extracts the BigQuery data via SQL, saves it to a GCS bucket (output_csv_path), and passes that GCS string to Step 2, which uses pandas to read it and train the xgboost model.
  3. The Zero-Downtime Swap (traffic_percentage=100): Step 3 doesn’t just print a success message. It physically uploads the newly generated model.bst artifact to the Model Registry. Then, by calling endpoint.deploy() on the existing endpoint ID and setting the traffic to 100%, Vertex AI spins up the new model behind the scenes, waits for it to become healthy, and seamlessly redirects the load balancer. The end user never drops a single HTTP request.

Conclusion to Part 2

We have achieved the holy grail of machine learning operations: a closed-loop, self-healing ecosystem. Without ever leaving the Google Cloud environment, we configured petabyte-scale telemetry, established sliding-window CRON monitors, parsed complex payload arrays via BigQuery SQL, visually proved statistical drift with SciPy, and deployed a containerized Kubeflow Pipeline to autonomously cure the model.

This completes the architectural and code-focused workflow of our Gold Standard MLOps series. Here is what has been achieved so far, as detailed in the following complete end-to-end flow diagram and its chronological sequential diagram.

But a question remains: How does Vertex AI actually calculate that 0.474 drift score to trigger the alert in the first place?

In Part 3 (The Appendix), we will step away from the Python code and take a strict, research-oriented deep dive into the mathematical formulas—specifically the Kolmogorov-Smirnov Test and Jensen-Shannon Divergence—that power these automated intelligence systems.

References for Part 2

2 Likes