import copy
import time
import datetime
import numpy as np
import torch
from torchmetrics.functional.detection.map import mean_average_precision
from abs_art_tabular.core.interfaces.base_attack import BaseAttack
[документация]
def collect_attack_examples(X, attacked_X, y, original_predict, attacked_predict, n=5):
"""
Собирает примеры успешных атак с метками класса
Args:
X (np.ndarray): Оригинальные данные
attacked_X (np.ndarray): Атакованные данные
y (np.ndarray): Истинные метки
original_predict (np.ndarray): Предсказания до атаки
attacked_predict (np.ndarray): Предсказания после атаки
n (int): Число примеров для возврата
Returns:
dict: Примеры успешных атак, отсортированные по разным стратегиям
"""
successful_indices = np.where(original_predict != attacked_predict)[0]
if len(successful_indices) == 0:
return {"random": [], "minimal_changes": [], "first": []}
# Вычисляем изменения признаков
diff = np.abs(attacked_X[successful_indices] - X[successful_indices])
feature_changes = np.sum(diff, axis=1)
# Сортировка по минимальным изменениям
sorted_indices = successful_indices[np.argsort(feature_changes)]
# Выборка
random_indices = np.random.choice(sorted_indices, min(n, len(sorted_indices)), replace=False)
minimal_change_indices = sorted_indices[:min(n, len(sorted_indices))]
first_n_indices = successful_indices[:min(n, len(successful_indices))]
# Формируем примеры
def create_examples(indices):
return [
{
"original": X[i].tolist(),
"adversarial": attacked_X[i].tolist(),
"original_label": int(y[i]),
"attacked_label": int(attacked_predict[i])
}
for i in indices
]
examples = {
"random": create_examples(random_indices),
"minimal_changes": create_examples(minimal_change_indices),
"first": create_examples(first_n_indices)
}
return examples
[документация]
class TabularAttack(BaseAttack):
"""
Базовый класс для атак на табличные данные
"""
_art_attack_cls = None
def __init__(self, estimator, attack_config, **params):
"""
Инициализация атаки
Args:
estimator: Объект модели-оценщика
attack_config (dict): Конфигурация атаки
params (dict): Дополнительные параметры
"""
self.estimator = estimator
self.attack_config = attack_config
self.params = params
self.attack = self._art_attack_cls(self.estimator, **self.params)
[документация]
def run(self, X, y=None):
"""
Запуск атаки
Args:
X (np.ndarray): Входные данные
y (np.ndarray): Целевые значения
Returns:
dict: Результаты атаки
"""
start_time = time.time()
# Обнуляем счетчик вызовов модели
if hasattr(self.estimator, "predict_count"):
self.estimator.predict_count = 0
# Предсказания до атаки
original_predict = np.argmax(self.estimator.predict(X), axis=1)
# Генерация атакующих примеров
attacked_X = self.generate(X, y)
# Предсказания после атаки
attacked_predict = np.argmax(self.estimator.predict(attacked_X), axis=1)
total_model_queries = self.estimator.predict_count # Обновляем счетчик после атаки
# Подсчет успешных атак
successful_attacks = np.sum(original_predict != attacked_predict)
success_rate = successful_attacks / len(X)
# Рассчитываем метрики
from sklearn.metrics import accuracy_score
original_acc = accuracy_score(y, original_predict)
attacked_acc = accuracy_score(y, attacked_predict)
# Подсчет изменений признаков
diff = np.abs(attacked_X - X)
max_changes = np.max(diff, axis=0).tolist()
avg_changes = np.mean(diff, axis=0).tolist()
modified_features = (diff > 1e-6).sum(axis=0).tolist()
feature_changes = {
"max": max_changes,
"avg": avg_changes,
"count": modified_features
}
# Сбор данных для отчета (только атака)
attack_data = {
"attack_type": self.attack_config["type"],
"task_type": "detection",
"attack_params": {
**self.attack_config.get("params", {}),
"feature_changes": feature_changes
},
"original_metrics": {"accuracy": original_acc},
"attacked_metrics": {"accuracy": attacked_acc},
"metric": "accuracy",
"attack_duration": time.time() - start_time,
"total_model_queries": total_model_queries,
"successful_attacks": successful_attacks,
"attack_success_rate": success_rate,
"attack_examples": {}, # Заглушка
"timestamp": datetime.datetime.now().isoformat()
}
# Сбор примеров успешных атак
try:
attack_examples = collect_attack_examples(X, attacked_X, y, original_predict, attacked_predict, n=5)
attack_data["attack_examples"] = attack_examples
except Exception as e:
attack_data["attack_examples"] = {"random": [], "minimal_changes": [], "first": []}
return attack_data
[документация]
def generate(self, X, y=None):
"""
Генерация вредоносных примеров
Args:
X (np.ndarray): Входные данные
y (np.ndarray): Целевые значения
Returns:
np.ndarray: Атакованные данные
"""
return self.attack.generate(x=X, y=y)
[документация]
class DetectionAttack(BaseAttack):
"""
Базовый класс для атак на задачи детекции
"""
def __init__(self, estimator, attack_config, **params):
"""
Инициализация атаки на детекцию
Args:
estimator: Объект модели-оценщика
attack_config (dict): Конфигурация атаки
params (dict): Дополнительные параметры
"""
self.estimator = estimator
self.attack_config = attack_config
self.params = params
self.targeted = params.get("targeted", False)
self.change_class = params.get("change_class", None)
self.target_class = params.get("target_class", None)
self.classes = None
if self.targeted and (isinstance(self.change_class, str) or isinstance(self.target_class, str)):
try:
self.classes = self.estimator.model.classes
except:
import logging
logger = logging.getLogger()
logger.info("Attack set to targeted mode and change or target class specified as string names. But classes from model not found, so switching to untargeted mode")
self.targeted = False
params["targeted"] = False
if "change_class" in self.params:
del self.params["change_class"]
if "target_class" in self.params:
del self.params["target_class"]
self.attack = self._art_attack_cls(self.estimator, **self.params)
[документация]
def run(self, X, y=None):
"""
Запуск атаки на детекцию
Args:
X (np.ndarray): Входные данные
y (np.ndarray): Целевые значения
Returns:
dict: Результаты атаки
"""
start_time = time.time()
# Предсказания до атаки
original_predict = self.estimator.model.predict(X)
# Предсказания с постпроцессингом
start_orig_process_time = time.time()
orig_labeled_predict = self.estimator.model.predict(X, postprocess=True)
orig_process_time = time.time() - start_orig_process_time
# Генерация атакующих примеров
if self.targeted:
y = change_labels_for_attack(y if y else original_predict, self.change_class, self.target_class, self.classes)
attacked_X = self.generate(X, y)
else:
attacked_X = self.generate(X, None)
# Предсказания после атаки
attacked_predict = self.estimator.model.predict(attacked_X)
# Предсказания с постпроцессингом
start_attacked_process_time = time.time()
adv_labeled_predict = self.estimator.model.predict(attacked_X, postprocess=True)
attacked_process_time = time.time() - start_attacked_process_time
# Рассчет mAP
maps = list()
for i in range(len(X)):
metrics = mean_average_precision(
preds=[attacked_predict[i]],
target=[original_predict[i]],
backend='faster_coco_eval')
maps.append(metrics['map'])
maps = np.array(maps)
# Подсчет успешных атак
successful_attacks = np.sum(maps <= 0.15)
success_rate = successful_attacks / len(X)
# Подсчет изменений признаков
diff = np.abs(attacked_X - X)
max_changes = np.max(diff).tolist()
avg_changes = np.mean(diff).tolist()
modified_features = (diff > 1e-6).sum(axis=(1, 2, 3)).tolist()
feature_changes = {
"max": max_changes,
"avg": avg_changes,
"count": modified_features
}
# Сбор данных для отчета (только атака)
attack_data = {
"attack_type": self.attack_config["type"],
"attack_params": {
**self.attack_config.get("params", {}),
"feature_changes": feature_changes
},
"metrics": {
"map": {"original": 1, "attacked": maps.mean()},
"inference_time": {"original": orig_process_time, "attacked": attacked_process_time},
},
"original_metrics": {"map": 1},
"attacked_metrics": {"map": maps.mean()},
"metric": "map",
"attack_duration": time.time() - start_time,
"total_model_queries": -1, # TODO
"successful_attacks": successful_attacks,
"attack_success_rate": success_rate,
"attack_examples": {}, # Заглушка
"timestamp": datetime.datetime.now().isoformat()
}
# Сбор примеров успешных атак
try:
orig_imgs = (np.clip(X.transpose(0, 2, 3, 1), 0, 1) * 255).astype(np.uint8)
adv_imgs = (np.clip(attacked_X.transpose(0, 2, 3, 1), 0, 1) * 255).astype(np.uint8)
from src.utils import draw_detection_labels, encode_b64_image
examples = []
for i in range(len(X)):
original_sample = draw_detection_labels(orig_imgs[i].copy(), orig_labeled_predict[i])
adversarial_sample = draw_detection_labels(adv_imgs[i].copy(), adv_labeled_predict[i])
examples.append({
"original": encode_b64_image(original_sample),
"adversarial": encode_b64_image(adversarial_sample),
"original_label": orig_labeled_predict[i],
"attacked_label": adv_labeled_predict[i],
})
attack_data["attack_examples"] = examples
except Exception as e:
pass
return attack_data
[документация]
def generate(self, X, y=None):
"""
Генерация вредоносных примеров
Args:
X (np.ndarray): Входные данные
y (np.ndarray): Целевые значения
Returns:
np.ndarray: Атакованные данные
"""
return self.attack.generate(x=X, y=y)
[документация]
def change_labels_for_attack(
labels_orig: list[dict[str, 'np.ndarray | torch.Tensor']],
change_class: str | int | None = None,
target_class: str | int | None = None,
classes: list[str] | None = None
) -> list[dict[str, 'np.ndarray | torch.Tensor']]:
"""
Modifies label classes for a targeted attack scenario.
Takes a list of annotations and, depending on the parameters,
either removes certain labels or changes their class to a target.
Args:
labels_orig: A list of annotations, where each item is a dictionary with:
- 'boxes': np.ndarray or torch.Tensor of shape (N, 4)
- 'labels': np.ndarray or torch.Tensor of length N with integer class IDs
- 'scores': np.ndarray or torch.Tensor of length N with float confidence scores
change_class: The class to be changed or removed.
Can be a string (class name) or an integer (class ID).
If None, all labels will be considered as the source class.
target_class: The new class to assign to change_class labels.
Can be a string (class name) or an integer (class ID).
If None, labels of change_class will be removed.
classes: A list of class names. Required if either change_class or
target_class is specified as a string.
Returns:
A new list of annotations with the same keys ('boxes', 'labels', 'scores'),
but with modified label values.
Example Scenarios:
- change_class=None, target_class=None:
Removes **all** labels.
- change_class='cat', target_class=None:
Removes all 'cat' class labels.
- change_class=None, target_class='dog':
Converts all labels to class 'dog'.
- change_class='cat', target_class='dog':
Converts 'cat' labels to 'dog'; all others remain unchanged.
"""
if isinstance(change_class, str):
if classes is None:
raise ValueError("A list of classes must be provided when change_class is a string.")
if change_class not in classes:
raise ValueError(f"Class '{change_class}' not found in classes.")
change_id = classes.index(change_class)
else:
change_id = change_class
if isinstance(target_class, str):
if classes is None:
raise ValueError("A list of classes must be provided when target_class is a string.")
if target_class not in classes:
raise ValueError(f"Class '{target_class}' not found in classes.")
target_id = classes.index(target_class)
else:
target_id = target_class
if change_id is None and target_id is None:
return [
{
'boxes': l['boxes'][:0],
'scores': l['scores'][:0],
'labels': l['labels'][:0],
} for l in labels_orig
]
labels = copy.deepcopy(labels_orig)
if change_id is not None and target_id is None:
for l in labels:
keep = l['labels'] != change_id
l['boxes'] = l['boxes'][keep]
l['scores'] = l['scores'][keep]
l['labels'] = l['labels'][keep]
return labels
if change_id is None and target_id is not None:
for l in labels:
l['labels'][:] = target_id
return labels
for l in labels:
l['labels'][l['labels'] == change_id] = target_id
return labels