Implemented Live Macro Window Detection

This commit is contained in:
2025-06-07 00:05:21 -06:00
parent 5927403f12
commit c9e3e67f20
4 changed files with 427 additions and 86 deletions

View File

@ -10,6 +10,7 @@ from functools import partial
from io import BytesIO
import base64
import traceback
import random # Macro Randomization
import platform # OS Detection
import importlib.util
@ -338,7 +339,6 @@ class ScreenshotRegion(QtWidgets.QWidget):
async def screenshot_task(cfg):
nid=cfg.get('node_id')
alias=cfg.get('alias','')
print(f"[DEBUG] Running screenshot_task for {nid}")
r=CONFIG.data['regions'].get(nid)
if r:
region=(r['x'],r['y'],r['w'],r['h'])
@ -369,40 +369,116 @@ async def screenshot_task(cfg):
# ---------------- Macro Task ----------------
async def macro_task(cfg):
"""
Improved macro_task supporting all operation modes, live config, error reporting, and UI feedback.
"""
nid = cfg.get('node_id')
window_handle = cfg.get('window_handle')
mode = cfg.get('operation_mode', 'keypress') # 'keypress' or 'typed_text'
key = cfg.get('key')
text = cfg.get('text')
interval_ms = int(cfg.get('interval_ms', 1000))
randomize = cfg.get('randomize_interval', False)
random_min = int(cfg.get('random_min', 750))
random_max = int(cfg.get('random_max', 950))
active = cfg.get('active', True) # Whether macro is "started" or "paused"
print(f"[DEBUG] Macro task for node {nid} started on window {window_handle}")
import random
try:
while True:
if not active:
await asyncio.sleep(0.2)
continue
if mode == 'keypress' and key:
macro_engines.send_keypress_to_window(window_handle, key)
elif mode == 'typed_text' and text:
macro_engines.type_text_to_window(window_handle, text)
# Interval logic
if randomize:
ms = random.randint(random_min, random_max)
# Track trigger state for edge/level changes
last_trigger_value = 0
has_run_once = False
while True:
# Always re-fetch config (hot reload support)
# (In reality, you might want to deep-copy or re-provision on config update, but for MVP we refetch each tick)
window_handle = cfg.get('window_handle')
macro_type = cfg.get('macro_type', 'keypress') # Now matches UI config
operation_mode = cfg.get('operation_mode', 'Continuous')
key = cfg.get('key')
text = cfg.get('text')
interval_ms = int(cfg.get('interval_ms', 1000))
randomize = cfg.get('randomize_interval', False)
random_min = int(cfg.get('random_min', 750))
random_max = int(cfg.get('random_max', 950))
active = cfg.get('active', True)
trigger = int(cfg.get('trigger', 0)) # For trigger modes; default 0 if not set
# Define helper for error reporting
async def emit_macro_status(success, message=""):
await sio.emit('macro_status', {
"agent_id": AGENT_ID,
"node_id": nid,
"success": success,
"message": message,
"timestamp": int(asyncio.get_event_loop().time() * 1000)
})
# Stopped state (paused from UI)
if not (active is True or str(active).lower() == "true"):
await asyncio.sleep(0.2)
continue
try:
send_macro = False
# Operation Mode Logic
if operation_mode == "Run Once":
if not has_run_once:
send_macro = True
has_run_once = True # Only run once, then stop
elif operation_mode == "Continuous":
send_macro = True # Always run every interval
elif operation_mode == "Trigger-Continuous":
# Only run while trigger is "1"
if trigger == 1:
send_macro = True
else:
send_macro = False
elif operation_mode == "Trigger-Once":
# Run only on rising edge: 0->1
if last_trigger_value == 0 and trigger == 1:
send_macro = True
else:
send_macro = False
last_trigger_value = trigger
else:
ms = interval_ms
await asyncio.sleep(ms / 1000.0)
except asyncio.CancelledError:
print(f"[TASK] Macro role {nid} cancelled.")
except Exception as e:
print(f"[ERROR] Macro task {nid} failed: {e}")
import traceback
traceback.print_exc()
# Unknown mode: default to "Continuous"
send_macro = True
if send_macro:
# Actually perform macro
if macro_type == 'keypress' and key:
result = macro_engines.send_keypress_to_window(window_handle, key)
elif macro_type == 'typed_text' and text:
result = macro_engines.type_text_to_window(window_handle, text)
else:
await emit_macro_status(False, "Invalid macro type or missing key/text")
await asyncio.sleep(0.2)
continue
# Result may be True or (False, error)
if isinstance(result, tuple):
success, err = result
else:
success, err = bool(result), ""
if success:
await emit_macro_status(True, f"Macro sent: {macro_type}")
else:
await emit_macro_status(False, err or "Unknown macro engine failure")
else:
# No macro to send this cycle, just idle
await asyncio.sleep(0.05)
# Timing: only wait if we did send macro this tick
if send_macro:
if randomize:
ms = random.randint(random_min, random_max)
else:
ms = interval_ms
await asyncio.sleep(ms / 1000.0)
else:
await asyncio.sleep(0.1) # No macro action: check again soon
except asyncio.CancelledError:
print(f"[TASK] Macro role {nid} cancelled.")
break
except Exception as e:
print(f"[ERROR] Macro task {nid} failed: {e}")
import traceback
traceback.print_exc()
await emit_macro_status(False, str(e))
await asyncio.sleep(0.5)
# ---------------- Config Watcher ----------------
async def config_watcher():