Исходный код test_suite.test_utils

"""
Вспомогательные утилиты для тестирования ML моделей и атак.

Этот модуль содержит функции для подготовки тестового окружения, запуска атак,
обработки результатов и валидации отчетов. Все функции предназначены для
использования в тестах pytest.
"""
import subprocess
import sys
import os
import re
import json
import yaml
from pathlib import Path
import logging

# Настройка логирования для тестов
logger = logging.getLogger(__name__)

[документация] def get_project_root(): """Возвращает абсолютный путь к корню проекта. Определяет корневую директорию проекта как родительскую директорию относительно текущего файла. :return: Путь к корневой директории проекта :rtype: pathlib.Path """ return Path(__file__).parent.parent
[документация] def create_temp_config(original_config_path: str, temp_dir: str) -> str: """Создаёт временный конфиг с изменёнными путями для отчётов.""" # Определяем формат файла по расширению original_path = Path(original_config_path) with open(original_config_path, 'r', encoding='utf-8') as f: if original_path.suffix.lower() in ['.yaml', '.yml']: # Читаем как YAML config = yaml.safe_load(f) else: # Читаем как JSON config = json.load(f) # Обновляем пути к отчётам на временные config["reports"] = { "base_dir": str(temp_dir), "subdirs": { "json": "json/", "html": "html/" } } # Создаём временный конфиг с тем же расширением, что и оригинальный temp_config_path = os.path.join(temp_dir, f"temp_config{original_path.suffix}") with open(temp_config_path, 'w', encoding='utf-8') as f: if original_path.suffix.lower() in ['.yaml', '.yml']: # Сохраняем как YAML yaml.dump(config, f, default_flow_style=False, allow_unicode=True, indent=2) else: # Сохраняем как JSON json.dump(config, f, indent=2, ensure_ascii=False) return temp_config_path
[документация] def run_attack(config_path: str): """Запускает атаку через командную строку с использованием venv. Выполняет запуск основного скрипта checkai.py с указанным конфигурационным файлом в отдельном subprocess. Возвращает результат выполнения для дальнейшего анализа. :param config_path: Путь к конфигурационному файлу для запуска атаки :type config_path: str :return: Результат выполнения subprocess с stdout, stderr и кодом возврата :rtype: subprocess.CompletedProcess """ project_root = get_project_root() script_path = project_root / "checkai.py" config_file_path = Path(config_path) # Проверяем существование файлов assert script_path.exists(), f"checkai.py not found at {script_path}" assert config_file_path.exists(), f"Config file not found at {config_path}" # Настройка окружения для UTF-8 env = os.environ.copy() env["PYTHONIOENCODING"] = "utf-8" command = [ sys.executable, str(script_path), "--config", str(config_file_path) ] result = subprocess.run( command, capture_output=True, text=True, cwd=project_root, encoding="utf-8", errors="replace", env=env ) return result
[документация] def find_report_paths(stdout: str, stderr: str, temp_dir: str): """Извлекает пути к отчетам из логов. Анализирует stdout и stderr для извлечения путей к созданным JSON и HTML отчетам. Обрабатывает специальные случаи для моделей YOLO, которые сохраняют отчеты в подкаталогах. :param stdout: Стандартный вывод выполненного процесса :type stdout: str :param stderr: Стандартный поток ошибок выполненного процесса :type stderr: str :param temp_dir: Временная директория, куда были сохранены отчеты :type temp_dir: str :return: Кортеж из пути к JSON отчету и пути к HTML отчету, или (None, None) при ошибке :rtype: tuple[str, str] or tuple[None, None] """ # Извлекаем имя файла из логов full_output = stdout + stderr pattern = r"Full report saved to .+?[/\\](attack_report_.+?\.json)" match = re.search(pattern, full_output) if match: filename = match.group(1) logger.info(f"Found report filename: {filename}") # Формируем полные пути в временной директории json_dir = os.path.join(temp_dir, "json") html_dir = os.path.join(temp_dir, "html") # Ищем упоминание YOLO в логах для подкаталогов yolo_pattern = r"attack_report_\d+_\d+" yolo_match = re.search(yolo_pattern, full_output) if yolo_match and "yolo" in full_output.lower(): subdir = yolo_match.group(0) json_report_path = os.path.join(json_dir, subdir, filename) html_report_path = os.path.join(html_dir, subdir, filename.replace(".json", ".html")) else: json_report_path = os.path.join(json_dir, filename) html_report_path = os.path.join(html_dir, filename.replace(".json", ".html")) logger.info(f"JSON report path: {json_report_path}") logger.info(f"HTML report path: {html_report_path}") return json_report_path, html_report_path # Сохраним вывод для отладки debug_path = os.path.join(os.path.dirname(temp_dir), "test_output_debug.log") with open(debug_path, "w", encoding="utf-8") as f: f.write("=== STDOUT ===\n") f.write(stdout) f.write("\n=== STDERR ===\n") f.write(stderr) logger.error(f"Report paths not found! Full output saved to {debug_path}") return None, None
[документация] def load_and_validate_report(json_report_path, tmp_path): """Загружает и валидирует JSON отчет. Загружает JSON отчет, проверяет его структуру и валидирует наличие обязательных полей. Сохраняет форматированный отчет для отладки и обрабатывает ошибки парсинга JSON. :param json_report_path: Путь к JSON файлу отчета :type json_report_path: str or pathlib.Path :param tmp_path: Временная директория для сохранения отладочной информации :type tmp_path: pathlib.Path :return: Словарь с данными загруженного отчета :rtype: dict :raises AssertionError: Если структура отчета некорректна или JSON невалиден """ report_debug_path = tmp_path / "test_report_content.log" with open(json_report_path, 'r', encoding='utf-8') as f: # Декодируем JSON try: report_data = json.load(f) # Сохраняем форматированный JSON для отладки with open(report_debug_path, "w", encoding="utf-8") as debug_file: json.dump(report_data, debug_file, indent=2, ensure_ascii=False) logger.info(f"Parsed report content saved to {report_debug_path}") except json.JSONDecodeError as e: # Если не удалось декодировать, сохраняем сырой контент для анализа f.seek(0) raw_content = f.read() with open(report_debug_path, "w", encoding="utf-8") as debug_file: debug_file.write("FAILED TO PARSE JSON:\n") debug_file.write(str(e) + "\n\n") debug_file.write("RAW CONTENT:\n") debug_file.write(raw_content) logger.error(f"Failed to parse JSON report. Content saved to {report_debug_path}") raise AssertionError(f"Invalid JSON format in report: {e}") # Проверяем структуру JSON отчета assert "metadata" in report_data, "metadata section not found in report" assert "metrics" in report_data, "metrics section not found in report" assert "examples" in report_data, "examples section not found in report" metadata = report_data["metadata"] assert "model_type" in metadata, "model_type not found in metadata" assert "model_name" in metadata, "model_name not found in metadata" assert "dataset_name" in metadata, "dataset_name not found in metadata" assert "timestamp" in metadata, "timestamp not found in metadata" # Проверяем что есть хотя бы одна атака metrics_data = report_data.get("metrics", []) assert isinstance(metrics_data, list), "metrics should be a list" assert len(metrics_data) > 0, "No attack metrics found in report" # Проверяем структуру каждой атаки for i, attack_metric in enumerate(metrics_data): assert "attack_type" in attack_metric, f"attack_type not found in metric #{i}" return report_data