Files
nas_control_bot/.history/src/api/synology_20250830071505.py
2025-08-30 10:33:46 +09:00

448 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Модуль для взаимодействия с API Synology NAS с использованием библиотеки python-synology
"""
import requests
import json
import logging
from typing import Dict, Any, Optional, List, Tuple
import socket
import struct
from time import sleep
import urllib3
from synology_dsm import SynologyDSM
from src.config.config import (
SYNOLOGY_HOST,
SYNOLOGY_PORT,
SYNOLOGY_USERNAME,
SYNOLOGY_PASSWORD,
SYNOLOGY_SECURE,
SYNOLOGY_TIMEOUT,
SYNOLOGY_MAC,
WOL_PORT
)
# Отключение предупреждений о небезопасных SSL-соединениях
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
class SynologyAPI:
"""Класс для взаимодействия с API Synology NAS с использованием python-synology"""
def __init__(self):
"""Инициализация класса SynologyAPI"""
self.protocol = "https" if SYNOLOGY_SECURE else "http"
self.dsm = None
def login(self) -> bool:
"""Авторизация в API Synology NAS используя python-synology"""
try:
# Создаем экземпляр SynologyDSM
self.dsm = SynologyDSM(
SYNOLOGY_HOST,
port=SYNOLOGY_PORT,
username=SYNOLOGY_USERNAME,
password=SYNOLOGY_PASSWORD,
secure=SYNOLOGY_SECURE,
timeout=SYNOLOGY_TIMEOUT,
verify_ssl=False
)
# Авторизация
self.dsm.login()
logger.info("Successfully logged in to Synology NAS")
return True
except Exception as e:
logger.error(f"Failed to log in to Synology NAS: {str(e)}")
self.dsm = None
return False
def logout(self) -> bool:
"""Выход из API Synology NAS"""
if not self.dsm:
return True
try:
self.dsm.logout()
self.dsm = None
logger.info("Successfully logged out from Synology NAS")
return True
except Exception as e:
logger.error(f"Error during logout: {str(e)}")
return False
def get_system_status(self) -> Optional[Dict[str, Any]]:
"""Получение расширенного статуса системы"""
if not self.dsm and not self.login():
return None
try:
result = {
"model": self.dsm.information.model,
"version": self.dsm.information.version_string,
"uptime": self.dsm.information.uptime,
"serial": self.dsm.information.serial,
"temperature": self.dsm.information.temperature,
"temperature_unit": "C",
"cpu_usage": self.dsm.utilisation.cpu_total_load,
"memory": {
"total_mb": self.dsm.utilisation.memory_size_mb,
"available_mb": self.dsm.utilisation.memory_available_real_mb,
"usage_percent": self.dsm.utilisation.memory_real_usage
},
"network": self._get_network_info(),
"volumes": self._get_volumes_info()
}
logger.info("Successfully fetched extended system status")
return result
except Exception as e:
logger.error(f"Error getting system status: {str(e)}")
return None
def _get_network_info(self) -> List[Dict[str, Any]]:
"""Получение информации о сетевых интерфейсах"""
try:
result = []
# Получение информации о сети
for device in self.dsm.network.interfaces:
net_info = {
"device": device,
"ip": self.dsm.network.get_ip(device),
"mask": self.dsm.network.get_mask(device),
"mac": self.dsm.network.get_mac(device),
"type": self.dsm.network.get_type(device),
"rx_bytes": self.dsm.network.get_rx(device),
"tx_bytes": self.dsm.network.get_tx(device)
}
result.append(net_info)
return result
except Exception as e:
logger.error(f"Error getting network info: {str(e)}")
return []
def _get_volumes_info(self) -> List[Dict[str, Any]]:
"""Получение информации о томах хранения"""
try:
result = []
# Получение информации о томах
for volume in self.dsm.storage.volumes:
vol_info = {
"name": volume,
"status": self.dsm.storage.volume_status(volume),
"device_type": self.dsm.storage.volume_device_type(volume),
"total_size": self.dsm.storage.volume_size_total(volume),
"used_size": self.dsm.storage.volume_size_used(volume),
"percent_used": self.dsm.storage.volume_percentage_used(volume)
}
result.append(vol_info)
return result
except Exception as e:
logger.error(f"Error getting volumes info: {str(e)}")
return []
def get_shared_folders(self) -> List[Dict[str, Any]]:
"""Получение списка общих папок"""
if not self.dsm and not self.login():
return []
try:
result = []
# Получение информации о общих папках
for folder in self.dsm.share.shares:
share_info = {
"name": folder,
"path": self.dsm.share.get_info(folder).get("path", ""),
"desc": self.dsm.share.get_info(folder).get("desc", "")
}
result.append(share_info)
logger.info(f"Successfully retrieved {len(result)} shared folders")
return result
except Exception as e:
logger.error(f"Error getting shared folders: {str(e)}")
return []
def get_system_info(self) -> Dict[str, Any]:
"""Получение основной информации о системе"""
if not self.dsm and not self.login():
return {}
try:
result = {
"model": self.dsm.information.model,
"serial": self.dsm.information.serial,
"version": self.dsm.information.version_string,
"uptime": self.dsm.information.uptime
}
logger.info("Successfully fetched system info")
return result
except Exception as e:
logger.error(f"Error getting system info: {str(e)}")
return {}
def shutdown_system(self) -> bool:
"""Выключение системы"""
if not self.dsm and not self.login():
return False
try:
# Используем низкоуровневый API для отправки команды выключения
endpoint = "SYNO.DSM.System"
api_path = "entry.cgi"
req_param = {"version": 1, "method": "shutdown"}
self.dsm.post(endpoint, api_path, req_param)
logger.info("Successfully initiated system shutdown")
return True
except Exception as e:
logger.error(f"Error shutting down system: {str(e)}")
return False
def reboot_system(self) -> bool:
"""Перезагрузка системы"""
if not self.dsm and not self.login():
return False
try:
# Используем низкоуровневый API для отправки команды перезагрузки
endpoint = "SYNO.DSM.System"
api_path = "entry.cgi"
req_param = {"version": 1, "method": "reboot"}
self.dsm.post(endpoint, api_path, req_param)
logger.info("Successfully initiated system reboot")
return True
except Exception as e:
logger.error(f"Error rebooting system: {str(e)}")
return False
def is_online(self) -> bool:
"""Проверка онлайн-статуса Synology NAS"""
try:
socket_obj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_obj.settimeout(SYNOLOGY_TIMEOUT)
result = socket_obj.connect_ex((SYNOLOGY_HOST, SYNOLOGY_PORT))
socket_obj.close()
return result == 0
except socket.error:
return False
def wake_on_lan(self) -> bool:
"""Отправка Wake-on-LAN пакета для включения Synology NAS"""
if not SYNOLOGY_MAC:
logger.error("MAC address not configured")
return False
try:
# Преобразование MAC-адреса в байты
mac_address = SYNOLOGY_MAC.replace(':', '').replace('-', '')
mac_bytes = bytes.fromhex(mac_address)
# Создание Magic Packet
magic_packet = b'\xff' * 6 + mac_bytes * 16
# Отправка пакета
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(magic_packet, (SYNOLOGY_HOST, WOL_PORT))
sock.close()
logger.info(f"Wake-on-LAN packet sent to {SYNOLOGY_MAC}")
return True
except Exception as e:
logger.error(f"Error sending Wake-on-LAN packet: {str(e)}")
return False
def wait_for_boot(self, max_attempts: int = 30, delay: int = 10) -> bool:
"""Ожидание загрузки Synology NAS"""
logger.info("Waiting for Synology NAS to boot...")
for attempt in range(max_attempts):
if self.is_online():
logger.info(f"Synology NAS is online after {attempt + 1} attempts")
# Дадим дополнительное время для полной загрузки всех сервисов
sleep(delay)
return True
sleep(delay)
logger.info(f"Waiting for boot... Attempt {attempt + 1}/{max_attempts}")
logger.error(f"Synology NAS didn't come online after {max_attempts} attempts")
return False
def power_on(self) -> bool:
"""Включение Synology NAS"""
if self.is_online():
logger.info("Synology NAS is already online")
return True
# Отправка WoL пакета
if not self.wake_on_lan():
return False
# Ожидание загрузки
return self.wait_for_boot()
def power_off(self) -> bool:
"""Выключение Synology NAS"""
if not self.is_online():
logger.info("Synology NAS is already offline")
return True
return self.shutdown_system()
def get_system_load(self) -> Dict[str, Any]:
"""Получение информации о загрузке системы"""
if not self.dsm and not self.login():
return {}
try:
result = {
"cpu_load": self.dsm.utilisation.cpu_total_load,
"memory": {
"total_mb": self.dsm.utilisation.memory_size_mb,
"available_mb": self.dsm.utilisation.memory_available_real_mb,
"cached_mb": self.dsm.utilisation.memory_cached_mb,
"usage_percent": self.dsm.utilisation.memory_real_usage
},
"network": {}
}
# Добавляем данные по сети
for device in self.dsm.network.interfaces:
result["network"][device] = {
"rx_bytes": self.dsm.network.get_rx(device),
"tx_bytes": self.dsm.network.get_tx(device)
}
logger.info("Successfully fetched system load")
return result
except Exception as e:
logger.error(f"Error getting system load: {str(e)}")
return {}
def get_storage_status(self) -> Dict[str, Any]:
"""Получение подробной информации о хранилище"""
if not self.dsm and not self.login():
return {}
try:
result = {
"volumes": self._get_volumes_info(),
"disks": self._get_disks_info(),
"total_size": 0,
"total_used": 0
}
# Суммируем общий размер и использование
for volume in result["volumes"]:
result["total_size"] += volume["total_size"]
result["total_used"] += volume["used_size"]
logger.info("Successfully fetched storage status")
return result
except Exception as e:
logger.error(f"Error getting storage status: {str(e)}")
return {}
def _get_disks_info(self) -> List[Dict[str, Any]]:
"""Получение информации о дисках"""
try:
result = []
# Получение информации о дисках
for disk in self.dsm.storage.disks:
disk_info = {
"name": disk,
"model": self.dsm.storage.disk_model(disk),
"type": self.dsm.storage.disk_type(disk),
"status": self.dsm.storage.disk_status(disk),
"temp": self.dsm.storage.disk_temp(disk)
}
result.append(disk_info)
return result
except Exception as e:
logger.error(f"Error getting disks info: {str(e)}")
return []
def get_security_status(self) -> Dict[str, bool]:
"""Получение информации о состоянии безопасности"""
if not self.dsm and not self.login():
return {"success": False}
try:
# Используем низкоуровневый API для получения информации о безопасности
endpoint = "SYNO.Core.Security.DSM"
api_path = "entry.cgi"
req_param = {"version": 1, "method": "status"}
response = self.dsm.get(endpoint, api_path, req_param)
if response and "data" in response:
return {
"success": True,
"status": response["data"].get("status", "unknown"),
"last_check": response["data"].get("last_check", None),
"is_secure": response["data"].get("is_secure", False)
}
else:
return {"success": False}
except Exception as e:
logger.error(f"Error getting security status: {str(e)}")
return {"success": False}
def get_users(self) -> List[str]:
"""Получение списка пользователей"""
if not self.dsm and not self.login():
return []
try:
users = []
# Используем низкоуровневый API для получения списка пользователей
endpoint = "SYNO.Core.User"
api_path = "entry.cgi"
req_param = {"version": 1, "method": "list", "additional": ["email"]}
response = self.dsm.get(endpoint, api_path, req_param)
if response and "data" in response and "users" in response["data"]:
for user in response["data"]["users"]:
if "name" in user:
users.append(user["name"])
logger.info(f"Successfully retrieved {len(users)} users")
return users
except Exception as e:
logger.error(f"Error getting users: {str(e)}")
return []