Files
Touchh/hotels/pms_check.py

92 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
from datetime import datetime, timedelta
import requests
from django.db import models
from asgiref.sync import sync_to_async
class APIDataLogger:
"""Класс для работы с API, сохранения и обработки данных."""
def __init__(self, name, url, token=None, username=None, password=None):
self.name = name
self.url = url
self.token = token
self.username = username
self.password = password
self.output_dir = "modules"
def ensure_directory_exists(self, path):
"""Создать директорию, если она не существует."""
if not os.path.exists(path):
os.makedirs(path)
def fetch_data(self, additional_data=None):
"""Получить данные из API."""
headers = {"Content-Type": "application/json"}
data = additional_data or {}
if self.token:
data["token"] = self.token
response = requests.post(self.url, headers=headers, json=data, auth=(self.username, self.password) if self.username and self.password else None)
if response.status_code != 200:
print(f'{self.name}: API запрос не удался. Код статуса: {response.status_code}')
return []
return response.json()
def save_data(self, data, suffix):
"""Сохранить данные в файл JSON."""
now = datetime.now()
current_date = now.strftime('%Y-%m-%d')
directory = os.path.join(self.output_dir, current_date, self.name)
self.ensure_directory_exists(directory)
filename = f"{self.name} {suffix}.json"
filepath = os.path.join(directory, filename)
with open(filepath, 'w') as file:
json.dump(data, file)
return filepath
def load_previous_data(self, suffixes):
"""Загрузить данные из файлов за текущий и предыдущий интервалы."""
now = datetime.now()
current_date = now.strftime('%Y-%m-%d')
yesterday_date = (now - timedelta(days=1)).strftime('%Y-%m-%d')
directories = [(yesterday_date, "21"), (current_date, "9")] if 9 <= now.hour < 21 else [(current_date, "9"), (current_date, "21")]
data_combined = []
for date, suffix in directories:
filepath = os.path.join(self.output_dir, date, self.name, f"{self.name} {suffix}.json")
try:
with open(filepath, 'r') as file:
data_combined.extend(json.load(file))
except FileNotFoundError:
pass
return data_combined
def filter_data(self, data, filter_function):
"""Фильтрация данных с использованием переданной функции."""
return filter_function(data)
def process_and_save(self, additional_data=None, filter_function=None):
"""Основной процесс: запрос, сохранение, чтение, фильтрация."""
now = datetime.now()
suffix = "9" if 9 <= now.hour < 21 else "21"
# Шаг 1: Получить данные
raw_data = self.fetch_data(additional_data)
self.save_data(raw_data, suffix)
# Шаг 2: Загрузить данные за текущий и предыдущий интервал
combined_data = self.load_previous_data(["9", "21"])
# Шаг 3: Фильтрация
if filter_function:
combined_data = self.filter_data(combined_data, filter_function)
return combined_data