import discord from discord.ext import commands import asyncio import io from PIL import Image import socket import json import time import re import subprocess import os import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("vm_bot.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def load_config(): try: if os.path.exists('config.json'): with open('config.json', 'r') as f: return json.load(f) return { 'qmp_host': os.getenv('QMP_HOST', '127.0.0.1'), 'qmp_port': int(os.getenv('QMP_PORT', 4444)), 'discord_token': os.getenv('DISCORD_TOKEN'), 'command_channel_id': int(os.getenv('COMMAND_CHANNEL_ID', 0)), 'qemu_command': os.getenv('QEMU_COMMAND', '').split(' ') } except Exception as e: logger.error(f"Config error: {e}") raise CONFIG = load_config() class VMController: def __init__(self): self.qmp_host = CONFIG['qmp_host'] self.qmp_port = CONFIG['qmp_port'] self.process = None self.mouse_x = 16384 self.mouse_y = 16384 self.key_map = { 'enter': 'ret', 'ret': 'ret', 'return': 'ret', 'spc': 'spc', 'space': 'spc', ' ': 'spc', 'tab': 'tab', '\t': 'tab', 'backspace': 'backspace', 'bsp': 'backspace', 'esc': 'esc', 'escape': 'esc', 'up': 'up', 'down': 'down', 'left': 'left', 'right': 'right', 'del': 'delete', 'delete': 'delete', 'pgup': 'pgup', 'pgdn': 'pgdn', 'ctrl': 'ctrl', 'alt': 'alt', 'shift': 'shift', 'meta': 'meta_l', 'win': 'meta_l', 'cmd': 'meta_l', '.': 'dot', ',': 'comma', '-': 'minus', '/': 'slash', ';': 'semicolon', "'": 'apostrophe', '[': 'bracket_left', ']': 'bracket_right', '\\': 'backslash', '`': 'grave_accent', '=': 'equal' } def start_vm(self): if self.process and self.process.poll() is None: return False self.process = subprocess.Popen(CONFIG['qemu_command']) return True def stop_vm(self): if self.process: self.process.terminate() self.process.wait() self.process = None def qmp_command(self, command_dict): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(3.0) sock.connect((self.qmp_host, self.qmp_port)) sock.recv(4096) sock.send(b'{"execute": "qmp_capabilities"}\n') sock.recv(4096) sock.send((json.dumps(command_dict) + '\n').encode()) response = sock.recv(4096).decode().strip() sock.close() resp_data = json.loads(response) if "error" in resp_data: logger.error(f"QMP Error: {resp_data['error']['desc']}") return False return True except Exception as e: logger.error(f"QMP Socket Error: {e}") return False def send_raw_key(self, key_name): keys_to_send = [] for p in key_name.split('+'): p = p.strip().lower() qcode = self.key_map.get(p, p) keys_to_send.append({"type": "qcode", "data": qcode}) return self.qmp_command({ "execute": "send-key", "arguments": {"keys": keys_to_send} }) def send_mouse_move(self, dx=0, dy=0): scale = 32 self.mouse_x = max(0, min(32767, self.mouse_x + (dx * scale))) self.mouse_y = max(0, min(32767, self.mouse_y + (dy * scale))) return self.qmp_command({ "execute": "input-send-event", "arguments": { "events": [ {"type": "abs", "data": {"axis": "x", "value": self.mouse_x}}, {"type": "abs", "data": {"axis": "y", "value": self.mouse_y}} ] } }) def send_mouse_button(self, button='left', down=True): return self.qmp_command({ "execute": "input-send-event", "arguments": { "events": [{"type": "btn", "data": {"button": button, "down": down}}] } }) def send_literal_text(self, text): shift_map = { '!': '1', '@': '2', '#': '3', '$': '4', '%': '5', '^': '6', '&': '7', '*': '8', '(': '9', ')': '0', '_': 'minus', '+': 'equal', '{': 'bracket_left', '}': 'bracket_right', '|': 'backslash', ':': 'semicolon', '"': 'apostrophe', '<': 'comma', '>': 'dot', '?': 'slash', '~': 'grave_accent' } for char in text: if char == ' ': self.send_raw_key('spc') elif char in shift_map: self.qmp_command({ "execute": "send-key", "arguments": { "keys": [{"type": "qcode", "data": "shift"}, {"type": "qcode", "data": shift_map[char]}] } }) elif char.isupper(): self.qmp_command({ "execute": "send-key", "arguments": { "keys": [{"type": "qcode", "data": "shift"}, {"type": "qcode", "data": char.lower()}] } }) else: self.send_raw_key(char) time.sleep(0.01) def parse_and_execute(self, message_content): parts = re.findall(r'(\[[^\]]+\]|[^\[]+)', message_content) for part in parts: if part.startswith('[') and part.endswith(']'): content = part[1:-1].lower().strip() if content == 'mreset': self.mouse_x = 16384 self.mouse_y = 16384 self.send_mouse_move(0, 0) elif content.startswith('mx,') or content.startswith('my,'): try: axis, val = content.split(',') val = int(val) if axis == 'mx': self.send_mouse_move(dx=val, dy=0) else: self.send_mouse_move(dx=0, dy=val) except ValueError: continue elif content.startswith('mclick'): btn = content.split(',')[1].strip() if ',' in content else 'left' self.send_mouse_button(btn, True) time.sleep(0.05) self.send_mouse_button(btn, False) else: count = 1 cmd = content if '^' in content: try: cmd, count_str = content.rsplit('^', 1) count = int(count_str) except ValueError: pass for _ in range(count): self.send_raw_key(cmd.strip()) time.sleep(0.05) else: self.send_literal_text(part) return True def take_screenshot(self): try: path = "/tmp/vm_dump.ppm" if os.path.exists(path): os.remove(path) if not self.qmp_command({"execute": "screendump", "arguments": {"filename": path}}): return None for _ in range(15): if os.path.exists(path) and os.path.getsize(path) > 0: break time.sleep(0.1) img = Image.open(path) output = io.BytesIO() img.save(output, format='PNG') output.seek(0) return output except Exception as e: logger.error(f"Screenshot Error: {e}") return None intents = discord.Intents.default() intents.message_content = True intents.voice_states = True bot = commands.Bot(command_prefix="!", intents=intents) vm = VMController() def is_authorized(member): if not isinstance(member, discord.Member): return False has_role = any(role.name.lower() == "doas" for role in member.roles) return has_role and member.voice and member.voice.channel and not (member.voice.self_deaf or member.voice.deaf) @bot.event async def on_ready(): logger.info(f'Bot active: {bot.user}') vm.start_vm() @bot.event async def on_message(message): if message.author == bot.user or message.channel.id != CONFIG['command_channel_id'] or not is_authorized(message.author): return if message.content.startswith('!'): await bot.process_commands(message) return content = message.content if content.endswith(';e'): content = content[:-2] + '[enter]' async with message.channel.typing(): if await asyncio.to_thread(vm.parse_and_execute, content): await asyncio.sleep(0.8) shot = await asyncio.to_thread(vm.take_screenshot) if shot: await message.reply(file=discord.File(shot, filename='preview.png')) @bot.command(name='s') async def say_text(ctx, *, text: str): if not is_authorized(ctx.author): return await ctx.message.add_reaction("💬") @bot.command(name='reboot') async def reboot_vm(ctx): if not is_authorized(ctx.author): return vm.stop_vm() await asyncio.sleep(1) vm.start_vm() await ctx.send("VM Restarted.") @bot.command(name='screenshot') async def just_screenshot(ctx): if not is_authorized(ctx.author): return shot = await asyncio.to_thread(vm.take_screenshot) if shot: await ctx.send(file=discord.File(shot, filename='vm.png')) if __name__ == "__main__": try: bot.run(CONFIG['discord_token']) finally: vm.stop_vm()