"""
Вспомогательные утилиты для тестирования ML моделей и атак.
Этот модуль содержит функции для подготовки тестового окружения, запуска атак,
обработки результатов и валидации отчетов. Все функции предназначены для
использования в тестах pytest.
"""
import subprocess
import sys
import os
import re
import json
import yaml
from pathlib import Path
import logging
# Настройка логирования для тестов
logger = logging.getLogger(__name__)
[документация]
def get_project_root():
"""Возвращает абсолютный путь к корню проекта.
Определяет корневую директорию проекта как родительскую директорию
относительно текущего файла.
:return: Путь к корневой директории проекта
:rtype: pathlib.Path
"""
return Path(__file__).parent.parent
[документация]
def create_temp_config(original_config_path: str, temp_dir: str) -> str:
"""Создаёт временный конфиг с изменёнными путями для отчётов."""
# Определяем формат файла по расширению
original_path = Path(original_config_path)
with open(original_config_path, 'r', encoding='utf-8') as f:
if original_path.suffix.lower() in ['.yaml', '.yml']:
# Читаем как YAML
config = yaml.safe_load(f)
else:
# Читаем как JSON
config = json.load(f)
# Обновляем пути к отчётам на временные
config["reports"] = {
"base_dir": str(temp_dir),
"subdirs": {
"json": "json/",
"html": "html/"
}
}
# Создаём временный конфиг с тем же расширением, что и оригинальный
temp_config_path = os.path.join(temp_dir, f"temp_config{original_path.suffix}")
with open(temp_config_path, 'w', encoding='utf-8') as f:
if original_path.suffix.lower() in ['.yaml', '.yml']:
# Сохраняем как YAML
yaml.dump(config, f, default_flow_style=False, allow_unicode=True, indent=2)
else:
# Сохраняем как JSON
json.dump(config, f, indent=2, ensure_ascii=False)
return temp_config_path
[документация]
def run_attack(config_path: str):
"""Запускает атаку через командную строку с использованием venv.
Выполняет запуск основного скрипта checkai.py с указанным
конфигурационным файлом в отдельном subprocess. Возвращает
результат выполнения для дальнейшего анализа.
:param config_path: Путь к конфигурационному файлу для запуска атаки
:type config_path: str
:return: Результат выполнения subprocess с stdout, stderr и кодом возврата
:rtype: subprocess.CompletedProcess
"""
project_root = get_project_root()
script_path = project_root / "checkai.py"
config_file_path = Path(config_path)
# Проверяем существование файлов
assert script_path.exists(), f"checkai.py not found at {script_path}"
assert config_file_path.exists(), f"Config file not found at {config_path}"
# Настройка окружения для UTF-8
env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
command = [
sys.executable,
str(script_path),
"--config",
str(config_file_path)
]
result = subprocess.run(
command,
capture_output=True,
text=True,
cwd=project_root,
encoding="utf-8",
errors="replace",
env=env
)
return result
[документация]
def find_report_paths(stdout: str, stderr: str, temp_dir: str):
"""Извлекает пути к отчетам из логов.
Анализирует stdout и stderr для извлечения путей к созданным
JSON и HTML отчетам. Обрабатывает специальные случаи для
моделей YOLO, которые сохраняют отчеты в подкаталогах.
:param stdout: Стандартный вывод выполненного процесса
:type stdout: str
:param stderr: Стандартный поток ошибок выполненного процесса
:type stderr: str
:param temp_dir: Временная директория, куда были сохранены отчеты
:type temp_dir: str
:return: Кортеж из пути к JSON отчету и пути к HTML отчету, или (None, None) при ошибке
:rtype: tuple[str, str] or tuple[None, None]
"""
# Извлекаем имя файла из логов
full_output = stdout + stderr
pattern = r"Full report saved to .+?[/\\](attack_report_.+?\.json)"
match = re.search(pattern, full_output)
if match:
filename = match.group(1)
logger.info(f"Found report filename: {filename}")
# Формируем полные пути в временной директории
json_dir = os.path.join(temp_dir, "json")
html_dir = os.path.join(temp_dir, "html")
# Ищем упоминание YOLO в логах для подкаталогов
yolo_pattern = r"attack_report_\d+_\d+"
yolo_match = re.search(yolo_pattern, full_output)
if yolo_match and "yolo" in full_output.lower():
subdir = yolo_match.group(0)
json_report_path = os.path.join(json_dir, subdir, filename)
html_report_path = os.path.join(html_dir, subdir, filename.replace(".json", ".html"))
else:
json_report_path = os.path.join(json_dir, filename)
html_report_path = os.path.join(html_dir, filename.replace(".json", ".html"))
logger.info(f"JSON report path: {json_report_path}")
logger.info(f"HTML report path: {html_report_path}")
return json_report_path, html_report_path
# Сохраним вывод для отладки
debug_path = os.path.join(os.path.dirname(temp_dir), "test_output_debug.log")
with open(debug_path, "w", encoding="utf-8") as f:
f.write("=== STDOUT ===\n")
f.write(stdout)
f.write("\n=== STDERR ===\n")
f.write(stderr)
logger.error(f"Report paths not found! Full output saved to {debug_path}")
return None, None
[документация]
def load_and_validate_report(json_report_path, tmp_path):
"""Загружает и валидирует JSON отчет.
Загружает JSON отчет, проверяет его структуру и валидирует
наличие обязательных полей. Сохраняет форматированный отчет
для отладки и обрабатывает ошибки парсинга JSON.
:param json_report_path: Путь к JSON файлу отчета
:type json_report_path: str or pathlib.Path
:param tmp_path: Временная директория для сохранения отладочной информации
:type tmp_path: pathlib.Path
:return: Словарь с данными загруженного отчета
:rtype: dict
:raises AssertionError: Если структура отчета некорректна или JSON невалиден
"""
report_debug_path = tmp_path / "test_report_content.log"
with open(json_report_path, 'r', encoding='utf-8') as f:
# Декодируем JSON
try:
report_data = json.load(f)
# Сохраняем форматированный JSON для отладки
with open(report_debug_path, "w", encoding="utf-8") as debug_file:
json.dump(report_data, debug_file, indent=2, ensure_ascii=False)
logger.info(f"Parsed report content saved to {report_debug_path}")
except json.JSONDecodeError as e:
# Если не удалось декодировать, сохраняем сырой контент для анализа
f.seek(0)
raw_content = f.read()
with open(report_debug_path, "w", encoding="utf-8") as debug_file:
debug_file.write("FAILED TO PARSE JSON:\n")
debug_file.write(str(e) + "\n\n")
debug_file.write("RAW CONTENT:\n")
debug_file.write(raw_content)
logger.error(f"Failed to parse JSON report. Content saved to {report_debug_path}")
raise AssertionError(f"Invalid JSON format in report: {e}")
# Проверяем структуру JSON отчета
assert "metadata" in report_data, "metadata section not found in report"
assert "metrics" in report_data, "metrics section not found in report"
assert "examples" in report_data, "examples section not found in report"
metadata = report_data["metadata"]
assert "model_type" in metadata, "model_type not found in metadata"
assert "model_name" in metadata, "model_name not found in metadata"
assert "dataset_name" in metadata, "dataset_name not found in metadata"
assert "timestamp" in metadata, "timestamp not found in metadata"
# Проверяем что есть хотя бы одна атака
metrics_data = report_data.get("metrics", [])
assert isinstance(metrics_data, list), "metrics should be a list"
assert len(metrics_data) > 0, "No attack metrics found in report"
# Проверяем структуру каждой атаки
for i, attack_metric in enumerate(metrics_data):
assert "attack_type" in attack_metric, f"attack_type not found in metric #{i}"
return report_data