From a0564e4f8ffe92b9ce221062fe0cf471969b938f Mon Sep 17 00:00:00 2001 From: frosty Date: Fri, 9 Jan 2026 11:52:16 -0500 Subject: init --- main.py | 275 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 main.py (limited to 'main.py') diff --git a/main.py b/main.py new file mode 100644 index 0000000..c676344 --- /dev/null +++ b/main.py @@ -0,0 +1,275 @@ +import discord +from discord.ext import commands +import asyncio +import io +from PIL import Image +import socket +import json +import time +import re +import subprocess +import os +import logging + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("vm_bot.log"), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +def load_config(): + try: + if os.path.exists('config.json'): + with open('config.json', 'r') as f: + return json.load(f) + return { + 'qmp_host': os.getenv('QMP_HOST', '127.0.0.1'), + 'qmp_port': int(os.getenv('QMP_PORT', 4444)), + 'discord_token': os.getenv('DISCORD_TOKEN'), + 'command_channel_id': int(os.getenv('COMMAND_CHANNEL_ID', 0)), + 'qemu_command': os.getenv('QEMU_COMMAND', '').split(' ') + } + except Exception as e: + logger.error(f"Config error: {e}") + raise + +CONFIG = load_config() + +class VMController: + def __init__(self): + self.qmp_host = CONFIG['qmp_host'] + self.qmp_port = CONFIG['qmp_port'] + self.process = None + self.mouse_x = 16384 + self.mouse_y = 16384 + self.key_map = { + 'enter': 'ret', 'ret': 'ret', 'return': 'ret', + 'spc': 'spc', 'space': 'spc', ' ': 'spc', + 'tab': 'tab', '\t': 'tab', + 'backspace': 'backspace', 'bsp': 'backspace', + 'esc': 'esc', 'escape': 'esc', + 'up': 'up', 'down': 'down', 'left': 'left', 'right': 'right', + 'del': 'delete', 'delete': 'delete', + 'pgup': 'pgup', 'pgdn': 'pgdn', + 'ctrl': 'ctrl', 'alt': 'alt', 'shift': 'shift', 'meta': 'meta_l', + 'win': 'meta_l', 'cmd': 'meta_l', + '.': 'dot', ',': 'comma', '-': 'minus', '/': 'slash', + ';': 'semicolon', "'": 'apostrophe', '[': 'bracket_left', + ']': 'bracket_right', '\\': 'backslash', '`': 'grave_accent', '=': 'equal' + } + + def start_vm(self): + if self.process and self.process.poll() is None: + return False + self.process = subprocess.Popen(CONFIG['qemu_command']) + return True + + def stop_vm(self): + if self.process: + self.process.terminate() + self.process.wait() + self.process = None + + def qmp_command(self, command_dict): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(3.0) + sock.connect((self.qmp_host, self.qmp_port)) + sock.recv(4096) + sock.send(b'{"execute": "qmp_capabilities"}\n') + sock.recv(4096) + sock.send((json.dumps(command_dict) + '\n').encode()) + response = sock.recv(4096).decode().strip() + sock.close() + resp_data = json.loads(response) + if "error" in resp_data: + logger.error(f"QMP Error: {resp_data['error']['desc']}") + return False + return True + except Exception as e: + logger.error(f"QMP Socket Error: {e}") + return False + + def send_raw_key(self, key_name): + keys_to_send = [] + for p in key_name.split('+'): + p = p.strip().lower() + qcode = self.key_map.get(p, p) + keys_to_send.append({"type": "qcode", "data": qcode}) + + return self.qmp_command({ + "execute": "send-key", + "arguments": {"keys": keys_to_send} + }) + + def send_mouse_move(self, dx=0, dy=0): + scale = 32 + self.mouse_x = max(0, min(32767, self.mouse_x + (dx * scale))) + self.mouse_y = max(0, min(32767, self.mouse_y + (dy * scale))) + return self.qmp_command({ + "execute": "input-send-event", + "arguments": { + "events": [ + {"type": "abs", "data": {"axis": "x", "value": self.mouse_x}}, + {"type": "abs", "data": {"axis": "y", "value": self.mouse_y}} + ] + } + }) + + def send_mouse_button(self, button='left', down=True): + return self.qmp_command({ + "execute": "input-send-event", + "arguments": { + "events": [{"type": "btn", "data": {"button": button, "down": down}}] + } + }) + + def send_literal_text(self, text): + shift_map = { + '!': '1', '@': '2', '#': '3', '$': '4', '%': '5', + '^': '6', '&': '7', '*': '8', '(': '9', ')': '0', + '_': 'minus', '+': 'equal', '{': 'bracket_left', + '}': 'bracket_right', '|': 'backslash', ':': 'semicolon', + '"': 'apostrophe', '<': 'comma', '>': 'dot', '?': 'slash', + '~': 'grave_accent' + } + for char in text: + if char == ' ': + self.send_raw_key('spc') + elif char in shift_map: + self.qmp_command({ + "execute": "send-key", + "arguments": { + "keys": [{"type": "qcode", "data": "shift"}, {"type": "qcode", "data": shift_map[char]}] + } + }) + elif char.isupper(): + self.qmp_command({ + "execute": "send-key", + "arguments": { + "keys": [{"type": "qcode", "data": "shift"}, {"type": "qcode", "data": char.lower()}] + } + }) + else: + self.send_raw_key(char) + time.sleep(0.01) + + def parse_and_execute(self, message_content): + parts = re.findall(r'(\[[^\]]+\]|[^\[]+)', message_content) + for part in parts: + if part.startswith('[') and part.endswith(']'): + content = part[1:-1].lower().strip() + if content == 'mreset': + self.mouse_x = 16384 + self.mouse_y = 16384 + self.send_mouse_move(0, 0) + elif content.startswith('mx,') or content.startswith('my,'): + try: + axis, val = content.split(',') + val = int(val) + if axis == 'mx': self.send_mouse_move(dx=val, dy=0) + else: self.send_mouse_move(dx=0, dy=val) + except ValueError: continue + elif content.startswith('mclick'): + btn = content.split(',')[1].strip() if ',' in content else 'left' + self.send_mouse_button(btn, True) + time.sleep(0.05) + self.send_mouse_button(btn, False) + else: + count = 1 + cmd = content + if '^' in content: + try: + cmd, count_str = content.rsplit('^', 1) + count = int(count_str) + except ValueError: pass + for _ in range(count): + self.send_raw_key(cmd.strip()) + time.sleep(0.05) + else: + self.send_literal_text(part) + return True + + def take_screenshot(self): + try: + path = "/tmp/vm_dump.ppm" + if os.path.exists(path): os.remove(path) + if not self.qmp_command({"execute": "screendump", "arguments": {"filename": path}}): + return None + for _ in range(15): + if os.path.exists(path) and os.path.getsize(path) > 0: break + time.sleep(0.1) + img = Image.open(path) + output = io.BytesIO() + img.save(output, format='PNG') + output.seek(0) + return output + except Exception as e: + logger.error(f"Screenshot Error: {e}") + return None + +intents = discord.Intents.default() +intents.message_content = True +intents.voice_states = True +bot = commands.Bot(command_prefix="!", intents=intents) +vm = VMController() + +def is_authorized(member): + if not isinstance(member, discord.Member): return False + has_role = any(role.name.lower() == "doas" for role in member.roles) + return has_role and member.voice and member.voice.channel and not (member.voice.self_deaf or member.voice.deaf) + +@bot.event +async def on_ready(): + logger.info(f'Bot active: {bot.user}') + vm.start_vm() + +@bot.event +async def on_message(message): + if message.author == bot.user or message.channel.id != CONFIG['command_channel_id'] or not is_authorized(message.author): + return + + if message.content.startswith('!'): + await bot.process_commands(message) + return + + content = message.content + if content.endswith(';e'): + content = content[:-2] + '[enter]' + + async with message.channel.typing(): + if await asyncio.to_thread(vm.parse_and_execute, content): + await asyncio.sleep(0.8) + shot = await asyncio.to_thread(vm.take_screenshot) + if shot: + await message.reply(file=discord.File(shot, filename='preview.png')) + +@bot.command(name='s') +async def say_text(ctx, *, text: str): + if not is_authorized(ctx.author): return + await ctx.message.add_reaction("💬") + +@bot.command(name='reboot') +async def reboot_vm(ctx): + if not is_authorized(ctx.author): return + vm.stop_vm() + await asyncio.sleep(1) + vm.start_vm() + await ctx.send("VM Restarted.") + +@bot.command(name='screenshot') +async def just_screenshot(ctx): + if not is_authorized(ctx.author): return + shot = await asyncio.to_thread(vm.take_screenshot) + if shot: + await ctx.send(file=discord.File(shot, filename='vm.png')) + +if __name__ == "__main__": + try: + bot.run(CONFIG['discord_token']) + finally: + vm.stop_vm() -- cgit v1.2.3