import os import asyncio import logging import json from io import BytesIO from typing import Dict, List, Any, Optional import re import textwrap from aiogram import Bot, Dispatcher, types, F from aiogram.filters import Command from aiogram.types import Message from aiogram.enums import ParseMode from dotenv import load_dotenv from ollama import AsyncClient import asyncpg from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton # Настройка логирования logging.basicConfig(level=logging.INFO) # Загрузка переменных окружения load_dotenv() # Инициализация бота и диспетчера bot = Bot(token=os.getenv("BOT_TOKEN")) dp = Dispatcher() # Инициализация клиента Ollama ollama_client = AsyncClient(host=os.getenv("OLLAMA_HOST")) model_name = os.getenv("OLLAMA_MODEL") # Соединение с PostgreSQL postgres_pool: Optional[asyncpg.Pool] = None # Словарь для хранения контекста диалога для каждого пользователя в памяти (кэш) user_messages: Dict[int, List[Dict[str, Any]]] = {} # Словарь для хранения задач обновления статуса печати typing_tasks: Dict[int, asyncio.Task] = {} # Максимальная длина сообщения Telegram MAX_MESSAGE_LENGTH = 4096 # Задержка между отправкой разделенных сообщений SPLIT_MESSAGE_DELAY = 0.5 # Начальный промт для ИИ INITIAL_PROMPT = "Ты - Альфа, виртуальный помощник, который должен выполнять любые запросы от пользователя. Взаимодействие с тобой происходит через мессенджер Telegram, поэтому тебе важно писать текст с форматированием, допустимым для этого мессенджера (в Markdown)" async def init_db(): """Инициализация базы данных""" global postgres_pool # Создаем пул соединений с базой данных postgres_pool = await asyncpg.create_pool( host=os.getenv("POSTGRES_HOST"), port=os.getenv("POSTGRES_PORT"), database=os.getenv("POSTGRES_DB"), user=os.getenv("POSTGRES_USER"), password=os.getenv("POSTGRES_PASSWORD") ) # Создаем таблицу, если она не существует async with postgres_pool.acquire() as conn: await conn.execute(''' CREATE TABLE IF NOT EXISTS messages ( id SERIAL PRIMARY KEY, user_id BIGINT NOT NULL, message_data JSONB NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ) ''') # Создаем индекс для быстрого поиска по user_id await conn.execute(''' CREATE INDEX IF NOT EXISTS idx_messages_user_id ON messages (user_id) ''') async def save_message_to_db(user_id: int, message_data: Dict[str, Any]): """Сохранение сообщения в базу данных""" if not postgres_pool: logging.error("Пул соединений PostgreSQL не инициализирован") return # Преобразуем изображения в строку для хранения if "images" in message_data: # Обходим проблему хранения бинарных данных - просто удаляем изображения # Для реального приложения можно реализовать хранение изображений в отдельной таблице message_data_copy = message_data.copy() message_data_copy["has_image"] = True message_data_copy.pop("images", None) else: message_data_copy = message_data try: async with postgres_pool.acquire() as conn: await conn.execute( ''' INSERT INTO messages (user_id, message_data) VALUES ($1, $2) ''', user_id, json.dumps(message_data_copy) ) except Exception as e: logging.error(f"Ошибка сохранения сообщения в БД: {e}") async def load_messages_from_db(user_id: int) -> List[Dict[str, Any]]: """Загрузка сообщений из базы данных""" if not postgres_pool: logging.error("Пул соединений PostgreSQL не инициализирован") return [] try: async with postgres_pool.acquire() as conn: rows = await conn.fetch( ''' SELECT message_data FROM messages WHERE user_id = $1 ORDER BY created_at ASC ''', user_id ) # Преобразуем строки JSON обратно в словари return [json.loads(row['message_data']) for row in rows] except Exception as e: logging.error(f"Ошибка загрузки сообщений из БД: {e}") return [] async def clear_messages_from_db(user_id: int): """Очистка сообщений пользователя из базы данных""" if not postgres_pool: logging.error("Пул соединений PostgreSQL не инициализирован") return try: async with postgres_pool.acquire() as conn: await conn.execute( ''' DELETE FROM messages WHERE user_id = $1 ''', user_id ) except Exception as e: logging.error(f"Ошибка очистки сообщений из БД: {e}") async def keep_typing(chat_id: int): """Функция для поддержания статуса 'печатает...' до окончания генерации ответа""" try: while True: await bot.send_chat_action(chat_id=chat_id, action="typing") await asyncio.sleep(4) # Обновляем каждые 4 секунды (статус активен 5 секунд) except asyncio.CancelledError: # Задача отменена, завершаем работу pass except Exception as e: logging.error(f"Ошибка в keep_typing: {e}") def escape_markdown(text: str) -> str: """Не экранирует символы, оставляя оригинальное форматирование от ИИ""" return text async def split_and_send_message(message: Message, text: str, parse_mode=None, reply_markup=None): """Функция для разделения и отправки длинных сообщений""" # Если сообщение меньше максимальной длины, отправляем как есть if len(text) <= MAX_MESSAGE_LENGTH: return await message.answer(text, parse_mode=parse_mode, reply_markup=reply_markup) # Иначе разделяем сообщение chunks = textwrap.wrap(text, MAX_MESSAGE_LENGTH, replace_whitespace=False, drop_whitespace=False) # Отправляем первый кусок с клавиатурой (если есть) await message.answer(chunks[0], parse_mode=parse_mode) # Отправляем остальные куски с задержкой for chunk in chunks[1:]: await asyncio.sleep(SPLIT_MESSAGE_DELAY) await message.answer(chunk, parse_mode=parse_mode) # Отправляем последнее сообщение с клавиатурой (если есть) if reply_markup and len(chunks) > 1: await message.answer("⬆️ Полный ответ ⬆️", reply_markup=reply_markup) return None async def handle_message_with_retry(message: Message, text: str, keyboard=None): """Функция для отправки сообщения с повторной попыткой без форматирования при ошибке""" try: # Пытаемся отправить сообщение с форматированием Markdown await split_and_send_message(message, text, parse_mode=ParseMode.MARKDOWN, reply_markup=keyboard) except Exception as e: error_text = str(e) logging.error(f"Ошибка при отправке сообщения с форматированием: {error_text}") # Проверяем, связана ли ошибка с разбором сущностей (ошибка форматирования) if "can't parse entities" in error_text or "entities" in error_text: # Отправляем сообщение о проблеме с форматированием await message.answer("⚠️ Возникла проблема с форматированием сообщения. Отправляю без форматирования.") # Отправляем сообщение без форматирования await split_and_send_message(message, text, reply_markup=keyboard) else: # Если ошибка не связана с форматированием, просто пробрасываем её дальше raise e @dp.message(Command("start")) async def cmd_start(message: Message): """Обработчик команды /start""" user_id = message.from_user.id # Загружаем контекст из БД при первом запуске messages = await load_messages_from_db(user_id) if not messages: # Если сообщений нет, создаем новый список с начальным промтом user_messages[user_id] = [{"role": "system", "content": INITIAL_PROMPT}] else: # Если есть, сохраняем их в кэше user_messages[user_id] = messages welcome_message = ( "Привет! Я бот на базе Gemma. Отправьте мне сообщение или фото, и я отвечу вам.\n\n" "Поддерживается *Markdown* форматирование:\n" "• *жирный текст* (звездочки)\n" "• _курсив_ (нижнее подчеркивание)\n" "• `код` (обратные кавычки)\n" "• [ссылки](https://example.com)\n\n" "Используйте /clear для очистки контекста диалога." ) # Создаем инлайн-кнопку для очистки истории keyboard = InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text="🗑 Очистить историю сообщений", callback_data="clear_history")] ]) await message.answer(welcome_message, parse_mode=ParseMode.MARKDOWN, reply_markup=keyboard) @dp.message(Command("clear")) async def cmd_clear(message: Message): """Очистка контекста диалога""" user_id = message.from_user.id # Очищаем контекст в памяти и добавляем начальный промт user_messages[user_id] = [{"role": "system", "content": INITIAL_PROMPT}] # Очищаем контекст в БД await clear_messages_from_db(user_id) # Сохраняем начальный промт в БД await save_message_to_db(user_id, user_messages[user_id][0]) await message.answer("Контекст диалога очищен.") @dp.message(F.photo) async def handle_photo(message: Message): """Обработчик фотографий""" user_id = message.from_user.id # Инициализация контекста пользователя, если он еще не существует if user_id not in user_messages: # Пытаемся загрузить контекст из БД messages = await load_messages_from_db(user_id) if not messages: # Если нет сообщений, создаем начальный промт user_messages[user_id] = [{"role": "system", "content": INITIAL_PROMPT}] # Сохраняем начальный промт в БД await save_message_to_db(user_id, user_messages[user_id][0]) else: # Проверяем, есть ли начальный промт if not messages or messages[0].get("role") != "system": # Добавляем начальный промт в начало истории messages.insert(0, {"role": "system", "content": INITIAL_PROMPT}) # Сохраняем начальный промт в БД await save_message_to_db(user_id, messages[0]) user_messages[user_id] = messages # Получаем фото наилучшего качества photo = message.photo[-1] file_info = await bot.get_file(photo.file_id) file_content = await bot.download_file(file_info.file_path) # Конвертируем фото в Base64 image_data = file_content.read() image_io = BytesIO(image_data) # Получаем подпись к фото (если есть) caption = message.caption or "Проанализируй эту фотографию" # Создаем сообщение с изображением user_message = { "role": "user", "content": caption, "images": [image_io.getvalue()] } # Добавляем сообщение в контекст user_messages[user_id].append(user_message) # Сохраняем сообщение в БД await save_message_to_db(user_id, user_message) # Запускаем задачу обновления статуса печати typing_tasks[user_id] = asyncio.create_task(keep_typing(user_id)) try: # Получаем ответ от Gemma answer = "" print("AI stream: ", end="", flush=True) async for part in await ollama_client.chat( model=model_name, messages=user_messages[user_id], stream=True ): content = part['message']['content'] answer += content # Логируем поток от ИИ без переноса строки и с flush=True print(content, end="", flush=True) # Останавливаем задачу обновления статуса печати if user_id in typing_tasks: typing_tasks[user_id].cancel() del typing_tasks[user_id] # Создаем ответ ассистента assistant_message = {"role": "assistant", "content": answer} # Добавляем ответ в историю user_messages[user_id].append(assistant_message) # Сохраняем ответ в БД await save_message_to_db(user_id, assistant_message) # Создаем инлайн-кнопку для очистки истории keyboard = InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text="🗑 Очистить историю сообщений", callback_data="clear_history")] ]) # Отправляем ответ пользователю с Markdown-форматированием и кнопкой safe_answer = escape_markdown(answer) await handle_message_with_retry(message, safe_answer, keyboard) except Exception as e: # Останавливаем задачу обновления статуса печати в случае ошибки if user_id in typing_tasks: typing_tasks[user_id].cancel() del typing_tasks[user_id] logging.error(f"Ошибка при обработке фото: {e}") await message.answer(f"Произошла ошибка при обработке фото: {e}") @dp.message(F.text) async def handle_text(message: Message): """Обработчик текстовых сообщений""" user_id = message.from_user.id # Инициализация контекста пользователя, если он еще не существует if user_id not in user_messages: # Пытаемся загрузить контекст из БД messages = await load_messages_from_db(user_id) if not messages: # Если нет сообщений, создаем начальный промт user_messages[user_id] = [{"role": "system", "content": INITIAL_PROMPT}] # Сохраняем начальный промт в БД await save_message_to_db(user_id, user_messages[user_id][0]) else: # Проверяем, есть ли начальный промт if not messages or messages[0].get("role") != "system": # Добавляем начальный промт в начало истории messages.insert(0, {"role": "system", "content": INITIAL_PROMPT}) # Сохраняем начальный промт в БД await save_message_to_db(user_id, messages[0]) user_messages[user_id] = messages # Создаем сообщение пользователя user_message = {"role": "user", "content": message.text} # Добавляем сообщение пользователя в контекст user_messages[user_id].append(user_message) # Сохраняем сообщение в БД await save_message_to_db(user_id, user_message) # Запускаем задачу обновления статуса печати typing_tasks[user_id] = asyncio.create_task(keep_typing(user_id)) try: # Получаем ответ от Gemma answer = "" print("AI stream: ", end="", flush=True) async for part in await ollama_client.chat( model=model_name, messages=user_messages[user_id], stream=True ): content = part['message']['content'] answer += content # Логируем поток от ИИ без переноса строки и с flush=True print(content, end="", flush=True) # Останавливаем задачу обновления статуса печати if user_id in typing_tasks: typing_tasks[user_id].cancel() del typing_tasks[user_id] # Создаем ответ ассистента assistant_message = {"role": "assistant", "content": answer} # Добавляем ответ в историю user_messages[user_id].append(assistant_message) # Сохраняем ответ в БД await save_message_to_db(user_id, assistant_message) # Создаем инлайн-кнопку для очистки истории keyboard = InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text="🗑 Очистить историю сообщений", callback_data="clear_history")] ]) # Отправляем ответ пользователю с Markdown-форматированием и кнопкой safe_answer = escape_markdown(answer) await handle_message_with_retry(message, safe_answer, keyboard) except Exception as e: # Останавливаем задачу обновления статуса печати в случае ошибки if user_id in typing_tasks: typing_tasks[user_id].cancel() del typing_tasks[user_id] logging.error(f"Ошибка при обработке сообщения: {e}") await message.answer(f"Произошла ошибка при обработке сообщения: {e}") @dp.message() async def handle_other(message: Message): """Обработчик всех остальных типов сообщений""" await message.answer("Я могу обрабатывать только текст и фотографии. Пожалуйста, отправьте текст или одну фотографию.") # Добавляем обработчик нажатия на кнопку очистки истории @dp.callback_query(F.data == "clear_history") async def clear_history_callback(callback: types.CallbackQuery): """Обработчик кнопки очистки истории""" user_id = callback.from_user.id # Очищаем контекст в памяти и добавляем начальный промт user_messages[user_id] = [{"role": "system", "content": INITIAL_PROMPT}] # Очищаем контекст в БД await clear_messages_from_db(user_id) # Сохраняем начальный промт в БД await save_message_to_db(user_id, user_messages[user_id][0]) # Уведомляем пользователя await callback.answer("Контекст диалога очищен!") # Отправляем сообщение в чат await callback.message.answer("Контекст диалога очищен. Вы можете начать новый разговор.") async def main(): """Запуск бота""" # Инициализируем подключение к базе данных await init_db() try: # Запускаем бота await dp.start_polling(bot) finally: # Закрываем пул соединений при завершении if postgres_pool: await postgres_pool.close() if __name__ == "__main__": asyncio.run(main())