Evaluation & Benchmarking
Measuring Performance of Ontology-Enhanced AI Systems
This guide covers comprehensive evaluation methodologies for ontology-enhanced AI systems, focusing on plant disease diagnosis as a use case.
Evaluation Framework
1. Performance Metrics
from typing import List, Dict, Tuple, Union
import numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, confusion_matrix, classification_report,
roc_auc_score, average_precision_score
)
import matplotlib.pyplot as plt
import seaborn as sns
class OntologyModelEvaluator:
def __init__(self, class_names: List[str]):
self.class_names = class_names
self.metrics = {}
def calculate_metrics(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
y_probs: np.ndarray = None,
prefix: str = ""
) -> Dict[str, float]:
"""Calculate standard classification metrics."""
metrics = {
f"{prefix}accuracy": accuracy_score(y_true, y_pred),
f"{prefix}precision_macro": precision_score(y_true, y_pred, average='macro'),
f"{prefix}recall_macro": recall_score(y_true, y_pred, average='macro'),
f"{prefix}f1_macro": f1_score(y_true, y_pred, average='macro'),
f"{prefix}precision_weighted": precision_score(y_true, y_pred, average='weighted'),
f"{prefix}recall_weighted": recall_score(y_true, y_pred, average='weighted'),
f"{prefix}f1_weighted": f1_score(y_true, y_pred, average='weighted'),
}
if y_probs is not None:
try:
metrics[f"{prefix}roc_auc_ovr"] = roc_auc_score(
y_true, y_probs, multi_class='ovr', average='macro'
)
metrics[f"{prefix}pr_auc"] = average_precision_score(
y_true, y_probs, average='macro'
)
except Exception as e:
print(f"Could not calculate AUC metrics: {e}")
# Store metrics
self.metrics.update(metrics)
return metrics
def plot_confusion_matrix(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
normalize: bool = True,
title: str = 'Confusion Matrix'
) -> plt.Figure:
"""Plot confusion matrix."""
cm = confusion_matrix(y_true, y_pred)
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
fmt = '.2f'
else:
fmt = 'd'
fig, ax = plt.subplots(figsize=(12, 10))
sns.heatmap(
cm,
annot=True,
fmt=fmt,
xticklabels=self.class_names,
yticklabels=self.class_names,
cmap='Blues',
ax=ax
)
ax.set_xlabel('Predicted Labels')
ax.set_ylabel('True Labels')
ax.set_title(title)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
return fig
def get_classification_report(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
output_dict: bool = False
) -> Union[str, Dict]:
"""Generate classification report."""
return classification_report(
y_true,
y_pred,
target_names=self.class_names,
output_dict=output_dict
)
def save_metrics(self, filepath: str):
"""Save metrics to a JSON file."""
import json
with open(filepath, 'w') as f:
json.dump(self.metrics, f, indent=2)2. Ontology-Specific Metrics
class OntologyAwareEvaluator:
def __init__(self, ontology_manager):
self.ontology = ontology_manager
def semantic_similarity(self, pred_class: str, true_class: str) -> float:
"""Calculate semantic similarity between predicted and true classes."""
# Get paths to root for both classes
pred_path = self.ontology.get_path_to_root(pred_class)
true_path = self.ontology.get_path_to_root(true_class)
# Find lowest common ancestor
lca = self._find_lca(pred_path, true_path)
if lca is None:
return 0.0
# Calculate similarity based on path lengths
depth_lca = len(self.ontology.get_path_to_root(lca))
depth_pred = len(pred_path)
depth_true = len(true_path)
# Wu-Palmer similarity
similarity = (2 * depth_lca) / (depth_pred + depth_true)
return similarity
def hierarchical_metrics(
self,
y_true: List[str],
y_pred: List[str]
) -> Dict[str, float]:
"""Calculate hierarchical evaluation metrics."""
if len(y_true) != len(y_pred):
raise ValueError("Length of true and predicted labels must match")
total_similarity = 0.0
correct = 0
for true, pred in zip(y_true, y_pred):
if true == pred:
correct += 1
total_similarity += 1.0
else:
total_similarity += self.semantic_similarity(pred, true)
accuracy = correct / len(y_true)
avg_similarity = total_similarity / len(y_true)
return {
'hierarchical_accuracy': accuracy,
'average_semantic_similarity': avg_similarity,
'hierarchical_distance': 1 - avg_similarity
}
def _find_lca(self, path1: List[str], path2: List[str]) -> str:
"""Find lowest common ancestor between two paths."""
common = set(path1).intersection(set(path2))
if not common:
return None
# Find the LCA by getting the last common node
lca = None
for node in reversed(path1):
if node in common:
lca = node
break
return lcaBenchmarking Framework
1. Model Comparison
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional
import json
from pathlib import Path
class ModelBenchmark:
def __init__(self, benchmark_dir: str = "benchmark_results"):
self.benchmark_dir = Path(benchmark_dir)
self.benchmark_dir.mkdir(exist_ok=True)
self.results = {}
def add_model_result(
self,
model_name: str,
metrics: Dict[str, float],
metadata: Optional[Dict[str, Any]] = None
):
"""Add model results to the benchmark."""
if model_name in self.results:
print(f"Warning: Overwriting results for model '{model_name}'")
self.results[model_name] = {
'metrics': metrics,
'metadata': metadata or {},
'timestamp': pd.Timestamp.now().isoformat()
}
def compare_models(self, metric: str = 'f1_weighted') -> pd.DataFrame:
"""Compare models based on a specific metric."""
comparison = []
for model_name, result in self.results.items():
if metric in result['metrics']:
comparison.append({
'model': model_name,
metric: result['metrics'][metric],
**result['metadata']
})
if not comparison:
raise ValueError(f"No results found for metric: {metric}")
df = pd.DataFrame(comparison).sort_values(metric, ascending=False)
return df
def save_results(self, filename: str = "benchmark_results.json"):
"""Save benchmark results to a file."""
filepath = self.benchmark_dir / filename
with open(filepath, 'w') as f:
json.dump(self.results, f, indent=2)
@classmethod
def load_results(cls, filepath: str) -> 'ModelBenchmark':
"""Load benchmark results from a file."""
filepath = Path(filepath)
benchmark = cls(benchmark_dir=filepath.parent)
with open(filepath, 'r') as f:
benchmark.results = json.load(f)
return benchmark
def plot_metric_comparison(
self,
metrics: List[str],
title: str = "Model Comparison"
) -> plt.Figure:
"""Plot comparison of multiple metrics across models."""
# Prepare data
data = []
for model_name, result in self.results.items():
for metric in metrics:
if metric in result['metrics']:
data.append({
'Model': model_name,
'Metric': metric,
'Score': result['metrics'][metric]
})
if not data:
raise ValueError("No valid metrics found for plotting")
df = pd.DataFrame(data)
# Create plot
plt.figure(figsize=(12, 6))
ax = sns.barplot(
x='Metric',
y='Score',
hue='Model',
data=df,
palette='viridis'
)
plt.title(title)
plt.xticks(rotation=45)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
return plt.gcf()2. Performance Profiling
import time
import psutil
import os
from functools import wraps
from typing import Callable, Any, Dict
class PerformanceProfiler:
def __init__(self):
self.metrics = {}
def profile(self, func_name: str = None):
"""Decorator to profile function execution."""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
process = psutil.Process(os.getpid())
# Memory before
mem_before = process.memory_info().rss / 1024 / 1024 # MB
# Time execution
start_time = time.time()
result = func(*args, **kwargs)
elapsed_time = time.time() - start_time
# Memory after
mem_after = process.memory_info().rss / 1024 / 1024 # MB
# CPU usage
cpu_percent = process.cpu_percent(interval=0.1)
# Store metrics
name = func_name or func.__name__
self.metrics[name] = {
'execution_time_seconds': elapsed_time,
'memory_usage_mb': mem_after - mem_before,
'peak_memory_mb': mem_after,
'cpu_percent': cpu_percent,
'timestamp': time.time()
}
return result
return wrapper
return decorator
def get_metrics(self) -> Dict[str, Dict[str, float]]:
"""Get all collected metrics."""
return self.metrics
def to_dataframe(self) -> pd.DataFrame:
"""Convert metrics to a pandas DataFrame."""
if not self.metrics:
return pd.DataFrame()
df = pd.DataFrame.from_dict(self.metrics, orient='index')
df.index.name = 'function'
return df.reset_index()
def plot_performance(self) -> plt.Figure:
"""Plot performance metrics."""
if not self.metrics:
raise ValueError("No metrics collected yet")
df = self.to_dataframe()
fig, axes = plt.subplots(1, 2, figsize=(15, 5))
# Execution time
sns.barplot(
x='function',
y='execution_time_seconds',
data=df,
ax=axes[0],
palette='viridis'
)
axes[0].set_title('Execution Time (seconds)')
axes[0].tick_params(axis='x', rotation=45)
# Memory usage
sns.barplot(
x='function',
y='memory_usage_mb',
data=df,
ax=axes[1],
palette='viridis'
)
axes[1].set_title('Memory Usage (MB)')
axes[1].tick_params(axis='x', rotation=45)
plt.tight_layout()
return figCase Study: Plant Disease Diagnosis
1. Benchmarking Different Models
def benchmark_models(
models: Dict[str, Any],
test_loader: torch.utils.data.DataLoader,
device: str = 'cuda' if torch.cuda.is_available() else 'cpu'
) -> ModelBenchmark:
"""Benchmark multiple models on the test set."""
benchmark = ModelBenchmark()
evaluator = OntologyModelEvaluator(class_names=test_loader.dataset.classes)
for model_name, model in models.items():
print(f"Evaluating {model_name}...")
# Get predictions
y_true, y_pred, y_probs = [], [], []
model.eval()
with torch.no_grad():
for batch in test_loader:
inputs = batch['image'].to(device)
labels = batch['label'].to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
probs = torch.softmax(outputs, dim=1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(preds.cpu().numpy())
y_probs.append(probs.cpu().numpy())
y_true = np.array(y_true)
y_pred = np.array(y_pred)
y_probs = np.vstack(y_probs)
# Calculate metrics
metrics = evaluator.calculate_metrics(y_true, y_pred, y_probs)
# Add hierarchical metrics if ontology is available
if hasattr(test_loader.dataset, 'get_class_names'):
class_names = test_loader.dataset.get_class_names()
true_classes = [class_names[i] for i in y_true]
pred_classes = [class_names[i] for i in y_pred]
ontology_manager = OntologyManager() # Your ontology manager
ontology_evaluator = OntologyAwareEvaluator(ontology_manager)
hierarchical_metrics = ontology_evaluator.hierarchical_metrics(
true_classes, pred_classes
)
metrics.update(hierarchical_metrics)
# Add to benchmark
benchmark.add_model_result(
model_name=model_name,
metrics=metrics,
metadata={
'num_parameters': sum(p.numel() for p in model.parameters()),
'device': device,
'timestamp': pd.Timestamp.now().isoformat()
}
)
return benchmark2. Performance Analysis
def analyze_performance(benchmark: ModelBenchmark):
"""Analyze and visualize benchmark results."""
# Compare models
comparison = benchmark.compare_models(metric='f1_weighted')
# Plot metrics
metrics_to_plot = [
'accuracy',
'f1_weighted',
'average_semantic_similarity',
'hierarchical_accuracy'
]
plt.figure(figsize=(12, 6))
fig = benchmark.plot_metric_comparison(
metrics=metrics_to_plot,
title="Model Performance Comparison"
)
# Save results
benchmark.save_results("model_benchmark_results.json")
fig.savefig("model_comparison.png", bbox_inches='tight')
return comparisonBest Practices
1. Evaluation Protocol
- Cross-Validation: Use k-fold cross-validation for small datasets
- Test Set: Maintain a held-out test set for final evaluation
- Baselines: Compare against strong baselines and state-of-the-art
- Statistical Significance: Perform statistical tests to validate improvements
2. Reporting Metrics
- Always report both micro and macro averages
- Include confidence intervals when possible
- Report per-class metrics for imbalanced datasets
- Include computational efficiency metrics (inference time, memory usage)
3. Error Analysis
def analyze_errors(
model: torch.nn.Module,
test_loader: torch.utils.data.DataLoader,
class_names: List[str],
device: str = 'cuda' if torch.cuda.is_available() else 'cpu'
) -> pd.DataFrame:
"""Analyze model errors on the test set."""
model.eval()
errors = []
with torch.no_grad():
for batch in test_loader:
inputs = batch['image'].to(device)
labels = batch['label'].to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
probs = torch.softmax(outputs, dim=1)
for i in range(len(labels)):
if preds[i] != labels[i]:
errors.append({
'true_label': class_names[labels[i].item()],
'predicted_label': class_names[preds[i].item()],
'confidence': probs[i][preds[i]].item(),
'true_prob': probs[i][labels[i]].item()
})
return pd.DataFrame(errors)Next Steps
- Hyperparameter Tuning: Optimize model architectures and training parameters
- Ensemble Methods: Combine multiple models for improved performance
- Deployment Monitoring: Track model performance in production
- Continuous Evaluation: Set up automated evaluation pipelines
References
- Scikit-learn Metrics Documentation
- Pytorch Lightning Metrics
- Weights & Biases: Experiment tracking and visualization