Исходный код abs_art_tabular.core.attacks.attack

import copy
import time
import datetime
import numpy as np
import torch
from torchmetrics.functional.detection.map import mean_average_precision
from abs_art_tabular.core.interfaces.base_attack import BaseAttack


[документация] def collect_attack_examples(X, attacked_X, y, original_predict, attacked_predict, n=5): """ Собирает примеры успешных атак с метками класса Args: X (np.ndarray): Оригинальные данные attacked_X (np.ndarray): Атакованные данные y (np.ndarray): Истинные метки original_predict (np.ndarray): Предсказания до атаки attacked_predict (np.ndarray): Предсказания после атаки n (int): Число примеров для возврата Returns: dict: Примеры успешных атак, отсортированные по разным стратегиям """ successful_indices = np.where(original_predict != attacked_predict)[0] if len(successful_indices) == 0: return {"random": [], "minimal_changes": [], "first": []} # Вычисляем изменения признаков diff = np.abs(attacked_X[successful_indices] - X[successful_indices]) feature_changes = np.sum(diff, axis=1) # Сортировка по минимальным изменениям sorted_indices = successful_indices[np.argsort(feature_changes)] # Выборка random_indices = np.random.choice(sorted_indices, min(n, len(sorted_indices)), replace=False) minimal_change_indices = sorted_indices[:min(n, len(sorted_indices))] first_n_indices = successful_indices[:min(n, len(successful_indices))] # Формируем примеры def create_examples(indices): return [ { "original": X[i].tolist(), "adversarial": attacked_X[i].tolist(), "original_label": int(y[i]), "attacked_label": int(attacked_predict[i]) } for i in indices ] examples = { "random": create_examples(random_indices), "minimal_changes": create_examples(minimal_change_indices), "first": create_examples(first_n_indices) } return examples
[документация] class TabularAttack(BaseAttack): """ Базовый класс для атак на табличные данные """ _art_attack_cls = None def __init__(self, estimator, attack_config, **params): """ Инициализация атаки Args: estimator: Объект модели-оценщика attack_config (dict): Конфигурация атаки params (dict): Дополнительные параметры """ self.estimator = estimator self.attack_config = attack_config self.params = params self.attack = self._art_attack_cls(self.estimator, **self.params)
[документация] def run(self, X, y=None): """ Запуск атаки Args: X (np.ndarray): Входные данные y (np.ndarray): Целевые значения Returns: dict: Результаты атаки """ start_time = time.time() # Обнуляем счетчик вызовов модели if hasattr(self.estimator, "predict_count"): self.estimator.predict_count = 0 # Предсказания до атаки original_predict = np.argmax(self.estimator.predict(X), axis=1) # Генерация атакующих примеров attacked_X = self.generate(X, y) # Предсказания после атаки attacked_predict = np.argmax(self.estimator.predict(attacked_X), axis=1) total_model_queries = self.estimator.predict_count # Обновляем счетчик после атаки # Подсчет успешных атак successful_attacks = np.sum(original_predict != attacked_predict) success_rate = successful_attacks / len(X) # Рассчитываем метрики from sklearn.metrics import accuracy_score original_acc = accuracy_score(y, original_predict) attacked_acc = accuracy_score(y, attacked_predict) # Подсчет изменений признаков diff = np.abs(attacked_X - X) max_changes = np.max(diff, axis=0).tolist() avg_changes = np.mean(diff, axis=0).tolist() modified_features = (diff > 1e-6).sum(axis=0).tolist() feature_changes = { "max": max_changes, "avg": avg_changes, "count": modified_features } # Сбор данных для отчета (только атака) attack_data = { "attack_type": self.attack_config["type"], "task_type": "detection", "attack_params": { **self.attack_config.get("params", {}), "feature_changes": feature_changes }, "original_metrics": {"accuracy": original_acc}, "attacked_metrics": {"accuracy": attacked_acc}, "metric": "accuracy", "attack_duration": time.time() - start_time, "total_model_queries": total_model_queries, "successful_attacks": successful_attacks, "attack_success_rate": success_rate, "attack_examples": {}, # Заглушка "timestamp": datetime.datetime.now().isoformat() } # Сбор примеров успешных атак try: attack_examples = collect_attack_examples(X, attacked_X, y, original_predict, attacked_predict, n=5) attack_data["attack_examples"] = attack_examples except Exception as e: attack_data["attack_examples"] = {"random": [], "minimal_changes": [], "first": []} return attack_data
[документация] def generate(self, X, y=None): """ Генерация вредоносных примеров Args: X (np.ndarray): Входные данные y (np.ndarray): Целевые значения Returns: np.ndarray: Атакованные данные """ return self.attack.generate(x=X, y=y)
[документация] class DetectionAttack(BaseAttack): """ Базовый класс для атак на задачи детекции """ def __init__(self, estimator, attack_config, **params): """ Инициализация атаки на детекцию Args: estimator: Объект модели-оценщика attack_config (dict): Конфигурация атаки params (dict): Дополнительные параметры """ self.estimator = estimator self.attack_config = attack_config self.params = params self.targeted = params.get("targeted", False) self.change_class = params.get("change_class", None) self.target_class = params.get("target_class", None) self.classes = None if self.targeted and (isinstance(self.change_class, str) or isinstance(self.target_class, str)): try: self.classes = self.estimator.model.classes except: import logging logger = logging.getLogger() logger.info("Attack set to targeted mode and change or target class specified as string names. But classes from model not found, so switching to untargeted mode") self.targeted = False params["targeted"] = False if "change_class" in self.params: del self.params["change_class"] if "target_class" in self.params: del self.params["target_class"] self.attack = self._art_attack_cls(self.estimator, **self.params)
[документация] def run(self, X, y=None): """ Запуск атаки на детекцию Args: X (np.ndarray): Входные данные y (np.ndarray): Целевые значения Returns: dict: Результаты атаки """ start_time = time.time() # Предсказания до атаки original_predict = self.estimator.model.predict(X) # Предсказания с постпроцессингом start_orig_process_time = time.time() orig_labeled_predict = self.estimator.model.predict(X, postprocess=True) orig_process_time = time.time() - start_orig_process_time # Генерация атакующих примеров if self.targeted: y = change_labels_for_attack(y if y else original_predict, self.change_class, self.target_class, self.classes) attacked_X = self.generate(X, y) else: attacked_X = self.generate(X, None) # Предсказания после атаки attacked_predict = self.estimator.model.predict(attacked_X) # Предсказания с постпроцессингом start_attacked_process_time = time.time() adv_labeled_predict = self.estimator.model.predict(attacked_X, postprocess=True) attacked_process_time = time.time() - start_attacked_process_time # Рассчет mAP maps = list() for i in range(len(X)): metrics = mean_average_precision( preds=[attacked_predict[i]], target=[original_predict[i]], backend='faster_coco_eval') maps.append(metrics['map']) maps = np.array(maps) # Подсчет успешных атак successful_attacks = np.sum(maps <= 0.15) success_rate = successful_attacks / len(X) # Подсчет изменений признаков diff = np.abs(attacked_X - X) max_changes = np.max(diff).tolist() avg_changes = np.mean(diff).tolist() modified_features = (diff > 1e-6).sum(axis=(1, 2, 3)).tolist() feature_changes = { "max": max_changes, "avg": avg_changes, "count": modified_features } # Сбор данных для отчета (только атака) attack_data = { "attack_type": self.attack_config["type"], "attack_params": { **self.attack_config.get("params", {}), "feature_changes": feature_changes }, "metrics": { "map": {"original": 1, "attacked": maps.mean()}, "inference_time": {"original": orig_process_time, "attacked": attacked_process_time}, }, "original_metrics": {"map": 1}, "attacked_metrics": {"map": maps.mean()}, "metric": "map", "attack_duration": time.time() - start_time, "total_model_queries": -1, # TODO "successful_attacks": successful_attacks, "attack_success_rate": success_rate, "attack_examples": {}, # Заглушка "timestamp": datetime.datetime.now().isoformat() } # Сбор примеров успешных атак try: orig_imgs = (np.clip(X.transpose(0, 2, 3, 1), 0, 1) * 255).astype(np.uint8) adv_imgs = (np.clip(attacked_X.transpose(0, 2, 3, 1), 0, 1) * 255).astype(np.uint8) from src.utils import draw_detection_labels, encode_b64_image examples = [] for i in range(len(X)): original_sample = draw_detection_labels(orig_imgs[i].copy(), orig_labeled_predict[i]) adversarial_sample = draw_detection_labels(adv_imgs[i].copy(), adv_labeled_predict[i]) examples.append({ "original": encode_b64_image(original_sample), "adversarial": encode_b64_image(adversarial_sample), "original_label": orig_labeled_predict[i], "attacked_label": adv_labeled_predict[i], }) attack_data["attack_examples"] = examples except Exception as e: pass return attack_data
[документация] def generate(self, X, y=None): """ Генерация вредоносных примеров Args: X (np.ndarray): Входные данные y (np.ndarray): Целевые значения Returns: np.ndarray: Атакованные данные """ return self.attack.generate(x=X, y=y)
[документация] def change_labels_for_attack( labels_orig: list[dict[str, 'np.ndarray | torch.Tensor']], change_class: str | int | None = None, target_class: str | int | None = None, classes: list[str] | None = None ) -> list[dict[str, 'np.ndarray | torch.Tensor']]: """ Modifies label classes for a targeted attack scenario. Takes a list of annotations and, depending on the parameters, either removes certain labels or changes their class to a target. Args: labels_orig: A list of annotations, where each item is a dictionary with: - 'boxes': np.ndarray or torch.Tensor of shape (N, 4) - 'labels': np.ndarray or torch.Tensor of length N with integer class IDs - 'scores': np.ndarray or torch.Tensor of length N with float confidence scores change_class: The class to be changed or removed. Can be a string (class name) or an integer (class ID). If None, all labels will be considered as the source class. target_class: The new class to assign to change_class labels. Can be a string (class name) or an integer (class ID). If None, labels of change_class will be removed. classes: A list of class names. Required if either change_class or target_class is specified as a string. Returns: A new list of annotations with the same keys ('boxes', 'labels', 'scores'), but with modified label values. Example Scenarios: - change_class=None, target_class=None: Removes **all** labels. - change_class='cat', target_class=None: Removes all 'cat' class labels. - change_class=None, target_class='dog': Converts all labels to class 'dog'. - change_class='cat', target_class='dog': Converts 'cat' labels to 'dog'; all others remain unchanged. """ if isinstance(change_class, str): if classes is None: raise ValueError("A list of classes must be provided when change_class is a string.") if change_class not in classes: raise ValueError(f"Class '{change_class}' not found in classes.") change_id = classes.index(change_class) else: change_id = change_class if isinstance(target_class, str): if classes is None: raise ValueError("A list of classes must be provided when target_class is a string.") if target_class not in classes: raise ValueError(f"Class '{target_class}' not found in classes.") target_id = classes.index(target_class) else: target_id = target_class if change_id is None and target_id is None: return [ { 'boxes': l['boxes'][:0], 'scores': l['scores'][:0], 'labels': l['labels'][:0], } for l in labels_orig ] labels = copy.deepcopy(labels_orig) if change_id is not None and target_id is None: for l in labels: keep = l['labels'] != change_id l['boxes'] = l['boxes'][keep] l['scores'] = l['scores'][keep] l['labels'] = l['labels'][keep] return labels if change_id is None and target_id is not None: for l in labels: l['labels'][:] = target_id return labels for l in labels: l['labels'][l['labels'] == change_id] = target_id return labels