aboutsummaryrefslogtreecommitdiff
path: root/main.py
diff options
context:
space:
mode:
authorfrosty <frosty@illegalfirearms.store>2026-01-09 11:52:16 -0500
committerfrosty <frosty@illegalfirearms.store>2026-01-09 11:52:16 -0500
commita0564e4f8ffe92b9ce221062fe0cf471969b938f (patch)
treed677cbf5c08db7d6c4c77ce56aba80cabee797fb /main.py
init
Diffstat (limited to 'main.py')
-rw-r--r--main.py275
1 files changed, 275 insertions, 0 deletions
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..c676344
--- /dev/null
+++ b/main.py
@@ -0,0 +1,275 @@
+import discord
+from discord.ext import commands
+import asyncio
+import io
+from PIL import Image
+import socket
+import json
+import time
+import re
+import subprocess
+import os
+import logging
+
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ handlers=[
+ logging.FileHandler("vm_bot.log"),
+ logging.StreamHandler()
+ ]
+)
+logger = logging.getLogger(__name__)
+
+def load_config():
+ try:
+ if os.path.exists('config.json'):
+ with open('config.json', 'r') as f:
+ return json.load(f)
+ return {
+ 'qmp_host': os.getenv('QMP_HOST', '127.0.0.1'),
+ 'qmp_port': int(os.getenv('QMP_PORT', 4444)),
+ 'discord_token': os.getenv('DISCORD_TOKEN'),
+ 'command_channel_id': int(os.getenv('COMMAND_CHANNEL_ID', 0)),
+ 'qemu_command': os.getenv('QEMU_COMMAND', '').split(' ')
+ }
+ except Exception as e:
+ logger.error(f"Config error: {e}")
+ raise
+
+CONFIG = load_config()
+
+class VMController:
+ def __init__(self):
+ self.qmp_host = CONFIG['qmp_host']
+ self.qmp_port = CONFIG['qmp_port']
+ self.process = None
+ self.mouse_x = 16384
+ self.mouse_y = 16384
+ self.key_map = {
+ 'enter': 'ret', 'ret': 'ret', 'return': 'ret',
+ 'spc': 'spc', 'space': 'spc', ' ': 'spc',
+ 'tab': 'tab', '\t': 'tab',
+ 'backspace': 'backspace', 'bsp': 'backspace',
+ 'esc': 'esc', 'escape': 'esc',
+ 'up': 'up', 'down': 'down', 'left': 'left', 'right': 'right',
+ 'del': 'delete', 'delete': 'delete',
+ 'pgup': 'pgup', 'pgdn': 'pgdn',
+ 'ctrl': 'ctrl', 'alt': 'alt', 'shift': 'shift', 'meta': 'meta_l',
+ 'win': 'meta_l', 'cmd': 'meta_l',
+ '.': 'dot', ',': 'comma', '-': 'minus', '/': 'slash',
+ ';': 'semicolon', "'": 'apostrophe', '[': 'bracket_left',
+ ']': 'bracket_right', '\\': 'backslash', '`': 'grave_accent', '=': 'equal'
+ }
+
+ def start_vm(self):
+ if self.process and self.process.poll() is None:
+ return False
+ self.process = subprocess.Popen(CONFIG['qemu_command'])
+ return True
+
+ def stop_vm(self):
+ if self.process:
+ self.process.terminate()
+ self.process.wait()
+ self.process = None
+
+ def qmp_command(self, command_dict):
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(3.0)
+ sock.connect((self.qmp_host, self.qmp_port))
+ sock.recv(4096)
+ sock.send(b'{"execute": "qmp_capabilities"}\n')
+ sock.recv(4096)
+ sock.send((json.dumps(command_dict) + '\n').encode())
+ response = sock.recv(4096).decode().strip()
+ sock.close()
+ resp_data = json.loads(response)
+ if "error" in resp_data:
+ logger.error(f"QMP Error: {resp_data['error']['desc']}")
+ return False
+ return True
+ except Exception as e:
+ logger.error(f"QMP Socket Error: {e}")
+ return False
+
+ def send_raw_key(self, key_name):
+ keys_to_send = []
+ for p in key_name.split('+'):
+ p = p.strip().lower()
+ qcode = self.key_map.get(p, p)
+ keys_to_send.append({"type": "qcode", "data": qcode})
+
+ return self.qmp_command({
+ "execute": "send-key",
+ "arguments": {"keys": keys_to_send}
+ })
+
+ def send_mouse_move(self, dx=0, dy=0):
+ scale = 32
+ self.mouse_x = max(0, min(32767, self.mouse_x + (dx * scale)))
+ self.mouse_y = max(0, min(32767, self.mouse_y + (dy * scale)))
+ return self.qmp_command({
+ "execute": "input-send-event",
+ "arguments": {
+ "events": [
+ {"type": "abs", "data": {"axis": "x", "value": self.mouse_x}},
+ {"type": "abs", "data": {"axis": "y", "value": self.mouse_y}}
+ ]
+ }
+ })
+
+ def send_mouse_button(self, button='left', down=True):
+ return self.qmp_command({
+ "execute": "input-send-event",
+ "arguments": {
+ "events": [{"type": "btn", "data": {"button": button, "down": down}}]
+ }
+ })
+
+ def send_literal_text(self, text):
+ shift_map = {
+ '!': '1', '@': '2', '#': '3', '$': '4', '%': '5',
+ '^': '6', '&': '7', '*': '8', '(': '9', ')': '0',
+ '_': 'minus', '+': 'equal', '{': 'bracket_left',
+ '}': 'bracket_right', '|': 'backslash', ':': 'semicolon',
+ '"': 'apostrophe', '<': 'comma', '>': 'dot', '?': 'slash',
+ '~': 'grave_accent'
+ }
+ for char in text:
+ if char == ' ':
+ self.send_raw_key('spc')
+ elif char in shift_map:
+ self.qmp_command({
+ "execute": "send-key",
+ "arguments": {
+ "keys": [{"type": "qcode", "data": "shift"}, {"type": "qcode", "data": shift_map[char]}]
+ }
+ })
+ elif char.isupper():
+ self.qmp_command({
+ "execute": "send-key",
+ "arguments": {
+ "keys": [{"type": "qcode", "data": "shift"}, {"type": "qcode", "data": char.lower()}]
+ }
+ })
+ else:
+ self.send_raw_key(char)
+ time.sleep(0.01)
+
+ def parse_and_execute(self, message_content):
+ parts = re.findall(r'(\[[^\]]+\]|[^\[]+)', message_content)
+ for part in parts:
+ if part.startswith('[') and part.endswith(']'):
+ content = part[1:-1].lower().strip()
+ if content == 'mreset':
+ self.mouse_x = 16384
+ self.mouse_y = 16384
+ self.send_mouse_move(0, 0)
+ elif content.startswith('mx,') or content.startswith('my,'):
+ try:
+ axis, val = content.split(',')
+ val = int(val)
+ if axis == 'mx': self.send_mouse_move(dx=val, dy=0)
+ else: self.send_mouse_move(dx=0, dy=val)
+ except ValueError: continue
+ elif content.startswith('mclick'):
+ btn = content.split(',')[1].strip() if ',' in content else 'left'
+ self.send_mouse_button(btn, True)
+ time.sleep(0.05)
+ self.send_mouse_button(btn, False)
+ else:
+ count = 1
+ cmd = content
+ if '^' in content:
+ try:
+ cmd, count_str = content.rsplit('^', 1)
+ count = int(count_str)
+ except ValueError: pass
+ for _ in range(count):
+ self.send_raw_key(cmd.strip())
+ time.sleep(0.05)
+ else:
+ self.send_literal_text(part)
+ return True
+
+ def take_screenshot(self):
+ try:
+ path = "/tmp/vm_dump.ppm"
+ if os.path.exists(path): os.remove(path)
+ if not self.qmp_command({"execute": "screendump", "arguments": {"filename": path}}):
+ return None
+ for _ in range(15):
+ if os.path.exists(path) and os.path.getsize(path) > 0: break
+ time.sleep(0.1)
+ img = Image.open(path)
+ output = io.BytesIO()
+ img.save(output, format='PNG')
+ output.seek(0)
+ return output
+ except Exception as e:
+ logger.error(f"Screenshot Error: {e}")
+ return None
+
+intents = discord.Intents.default()
+intents.message_content = True
+intents.voice_states = True
+bot = commands.Bot(command_prefix="!", intents=intents)
+vm = VMController()
+
+def is_authorized(member):
+ if not isinstance(member, discord.Member): return False
+ has_role = any(role.name.lower() == "doas" for role in member.roles)
+ return has_role and member.voice and member.voice.channel and not (member.voice.self_deaf or member.voice.deaf)
+
+@bot.event
+async def on_ready():
+ logger.info(f'Bot active: {bot.user}')
+ vm.start_vm()
+
+@bot.event
+async def on_message(message):
+ if message.author == bot.user or message.channel.id != CONFIG['command_channel_id'] or not is_authorized(message.author):
+ return
+
+ if message.content.startswith('!'):
+ await bot.process_commands(message)
+ return
+
+ content = message.content
+ if content.endswith(';e'):
+ content = content[:-2] + '[enter]'
+
+ async with message.channel.typing():
+ if await asyncio.to_thread(vm.parse_and_execute, content):
+ await asyncio.sleep(0.8)
+ shot = await asyncio.to_thread(vm.take_screenshot)
+ if shot:
+ await message.reply(file=discord.File(shot, filename='preview.png'))
+
+@bot.command(name='s')
+async def say_text(ctx, *, text: str):
+ if not is_authorized(ctx.author): return
+ await ctx.message.add_reaction("💬")
+
+@bot.command(name='reboot')
+async def reboot_vm(ctx):
+ if not is_authorized(ctx.author): return
+ vm.stop_vm()
+ await asyncio.sleep(1)
+ vm.start_vm()
+ await ctx.send("VM Restarted.")
+
+@bot.command(name='screenshot')
+async def just_screenshot(ctx):
+ if not is_authorized(ctx.author): return
+ shot = await asyncio.to_thread(vm.take_screenshot)
+ if shot:
+ await ctx.send(file=discord.File(shot, filename='vm.png'))
+
+if __name__ == "__main__":
+ try:
+ bot.run(CONFIG['discord_token'])
+ finally:
+ vm.stop_vm()