Borealis-Legacy/data_collector_v2.py

327 lines
11 KiB
Python

#!/usr/bin/env python3
import time
import re
import threading
import numpy as np
import cv2
import pytesseract
from flask import Flask, jsonify
from PIL import Image, ImageGrab, ImageFilter
from PyQt5.QtWidgets import QApplication, QWidget
from PyQt5.QtCore import QTimer, QRect, QPoint, Qt, QMutex
from PyQt5.QtGui import QPainter, QPen, QColor, QFont
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
# =============================================================================
# Global Config
# =============================================================================
POLLING_RATE_MS = 500
MAX_DATA_POINTS = 8
DEFAULT_WIDTH = 180
DEFAULT_HEIGHT = 130
HANDLE_SIZE = 8
LABEL_HEIGHT = 20
# Template Matching Threshold (Define it here)
MATCH_THRESHOLD = 0.4 # Set to 0.4 as a typical value for correlation threshold
# Flask API setup
app = Flask(__name__)
# **Shared Region Data (Thread-Safe)**
region_lock = QMutex() # Mutex to synchronize access between UI and OCR thread
shared_region = {
"x": 250,
"y": 50,
"w": DEFAULT_WIDTH,
"h": DEFAULT_HEIGHT
}
# Global variable for OCR data
latest_data = {
"hp_current": 0,
"hp_total": 0,
"mp_current": 0,
"mp_total": 0,
"fp_current": 0,
"fp_total": 0,
"exp": 0.0000
}
# =============================================================================
# OCR Data Collection
# =============================================================================
def preprocess_image(image):
"""
Preprocess the image for OCR: convert to grayscale, resize, and apply thresholding.
"""
gray = image.convert("L") # Convert to grayscale
scaled = gray.resize((gray.width * 3, gray.height * 3)) # Upscale the image for better accuracy
thresh = scaled.point(lambda p: p > 200 and 255) # Apply a threshold to clean up the image
return thresh.filter(ImageFilter.MedianFilter(3)) # Apply a median filter to remove noise
def sanitize_experience_string(raw_text):
text_no_percent = raw_text.replace('%', '')
text_no_spaces = text_no_percent.replace(' ', '')
cleaned = re.sub(r'[^0-9\.]', '', text_no_spaces)
match = re.search(r'\d+(?:\.\d+)?', cleaned)
if not match:
return None
val = float(match.group(0))
if val < 0:
val = 0
elif val > 100:
val = 100
return round(val, 4)
def locate_bars_opencv(template_path, threshold=MATCH_THRESHOLD):
"""
Attempt to locate the bars via OpenCV template matching.
"""
screenshot_pil = ImageGrab.grab()
screenshot_np = np.array(screenshot_pil)
screenshot_bgr = cv2.cvtColor(screenshot_np, cv2.COLOR_RGB2BGR)
template_bgr = cv2.imread(template_path, cv2.IMREAD_COLOR)
if template_bgr is None:
return None
result = cv2.matchTemplate(screenshot_bgr, template_bgr, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
th, tw, _ = template_bgr.shape
if max_val >= threshold:
found_x, found_y = max_loc
return (found_x, found_y, tw, th)
else:
return None
def parse_all_stats(raw_text):
raw_lines = raw_text.splitlines()
lines = [l.strip() for l in raw_lines if l.strip()]
stats_dict = {
"hp": (0,1),
"mp": (0,1),
"fp": (0,1),
"exp": None
}
if len(lines) < 4:
return stats_dict
hp_match = re.search(r"(\d+)\s*/\s*(\d+)", lines[0])
if hp_match:
stats_dict["hp"] = (int(hp_match.group(1)), int(hp_match.group(2)))
mp_match = re.search(r"(\d+)\s*/\s*(\d+)", lines[1])
if mp_match:
stats_dict["mp"] = (int(mp_match.group(1)), int(mp_match.group(2)))
fp_match = re.search(r"(\d+)\s*/\s*(\d+)", lines[2])
if fp_match:
stats_dict["fp"] = (int(fp_match.group(1)), int(fp_match.group(2)))
exp_val = sanitize_experience_string(lines[3])
stats_dict["exp"] = exp_val
return stats_dict
# =============================================================================
# Region & UI
# =============================================================================
class Region:
"""
Defines a draggable/resizable screen region for OCR capture.
"""
def __init__(self, x, y, label="Region", color=QColor(0, 0, 255)):
self.x = x
self.y = y
self.w = DEFAULT_WIDTH
self.h = DEFAULT_HEIGHT
self.label = label
self.color = color
self.visible = True
self.data = ""
def rect(self):
return QRect(self.x, self.y, self.w, self.h)
def label_rect(self):
return QRect(self.x, self.y - LABEL_HEIGHT, self.w, LABEL_HEIGHT)
def resize_handles(self):
return [
QRect(self.x - HANDLE_SIZE // 2, self.y - HANDLE_SIZE // 2, HANDLE_SIZE, HANDLE_SIZE),
QRect(self.x + self.w - HANDLE_SIZE // 2, self.y - HANDLE_SIZE // 2, HANDLE_SIZE, HANDLE_SIZE),
QRect(self.x - HANDLE_SIZE // 2, self.y + self.h - HANDLE_SIZE // 2, HANDLE_SIZE, HANDLE_SIZE),
QRect(self.x + self.w - HANDLE_SIZE // 2, self.y + self.h - HANDLE_SIZE // 2, HANDLE_SIZE, HANDLE_SIZE),
]
# =============================================================================
# Flask API Server
# =============================================================================
@app.route('/data')
def get_data():
"""Returns the latest OCR data as JSON."""
return jsonify(latest_data)
def collect_ocr_data():
"""
Collects OCR data every 0.5 seconds and updates global latest_data.
"""
global latest_data
while True:
# **Fetch updated region values from UI (thread-safe)**
region_lock.lock() # Lock for thread safety
x, y, w, h = shared_region["x"], shared_region["y"], shared_region["w"], shared_region["h"]
region_lock.unlock()
# **Grab the image of the updated region**
screenshot = ImageGrab.grab(bbox=(x, y, x + w, y + h))
# **Debug: Save screenshots to verify capture**
screenshot.save("debug_screenshot.png")
# Preprocess image
processed = preprocess_image(screenshot)
processed.save("debug_processed.png") # Debug: Save processed image
# Run OCR
text = pytesseract.image_to_string(processed, config='--psm 4 --oem 1')
stats = parse_all_stats(text.strip())
hp_cur, hp_max = stats["hp"]
mp_cur, mp_max = stats["mp"]
fp_cur, fp_max = stats["fp"]
exp_val = stats["exp"]
# Update latest data
latest_data = {
"hp_current": hp_cur,
"hp_total": hp_max,
"mp_current": mp_cur,
"mp_total": mp_max,
"fp_current": fp_cur,
"fp_total": fp_max,
"exp": exp_val
}
# DEBUG OUTPUT
print(f"Flyff - Character Status: HP: {hp_cur}/{hp_max}, MP: {mp_cur}/{mp_max}, FP: {fp_cur}/{fp_max}, EXP: {exp_val}%")
time.sleep(0.5)
# =============================================================================
# OverlayCanvas (UI)
# =============================================================================
class OverlayCanvas(QWidget):
"""
UI overlay that allows dragging/resizing of the OCR region.
"""
def __init__(self, parent=None):
super().__init__(parent)
# **Full-screen overlay**
screen_geo = QApplication.primaryScreen().geometry()
self.setGeometry(screen_geo) # Set to full screen
self.setWindowFlags(Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint)
self.setAttribute(Qt.WA_TranslucentBackground, True)
# **Load shared region**
self.region = shared_region
self.drag_offset = None
self.selected_handle = None
def paintEvent(self, event):
"""Draw the blue OCR region."""
painter = QPainter(self)
pen = QPen(QColor(0, 0, 255))
pen.setWidth(5) # Thicker lines
painter.setPen(pen)
painter.drawRect(self.region["x"], self.region["y"], self.region["w"], self.region["h"])
painter.setFont(QFont("Arial", 12, QFont.Bold))
painter.setPen(QColor(0, 0, 255))
painter.drawText(self.region["x"], self.region["y"] - 5, "Character Status")
def mousePressEvent(self, event):
"""Handle drag and resize interactions."""
if event.button() == Qt.LeftButton:
region_lock.lock() # Lock for thread safety
x, y, w, h = self.region["x"], self.region["y"], self.region["w"], self.region["h"]
region_lock.unlock()
for i, handle in enumerate(self.resize_handles()):
if handle.contains(event.pos()):
self.selected_handle = i
return
if QRect(x, y, w, h).contains(event.pos()):
self.drag_offset = event.pos() - QPoint(x, y)
def mouseMoveEvent(self, event):
"""Allow dragging and resizing."""
if self.selected_handle is not None:
region_lock.lock()
sr = self.region
if self.selected_handle == 0: # Top-left
sr["w"] += sr["x"] - event.x()
sr["h"] += sr["y"] - event.y()
sr["x"] = event.x()
sr["y"] = event.y()
elif self.selected_handle == 1: # Bottom-right
sr["w"] = event.x() - sr["x"]
sr["h"] = event.y() - sr["y"]
sr["w"] = max(sr["w"], 10)
sr["h"] = max(sr["h"], 10)
region_lock.unlock()
self.update()
elif self.drag_offset:
region_lock.lock()
self.region["x"] = event.x() - self.drag_offset.x()
self.region["y"] = event.y() - self.drag_offset.y()
region_lock.unlock()
self.update()
def mouseReleaseEvent(self, event):
"""End drag or resize event."""
self.selected_handle = None
self.drag_offset = None
def resize_handles(self):
"""Get the resizing handles of the region."""
return [
QRect(self.region["x"] - HANDLE_SIZE // 2, self.region["y"] - HANDLE_SIZE // 2, HANDLE_SIZE, HANDLE_SIZE),
QRect(self.region["x"] + self.region["w"] - HANDLE_SIZE // 2, self.region["y"] + self.region["h"] - HANDLE_SIZE // 2, HANDLE_SIZE, HANDLE_SIZE)
]
# =============================================================================
# Start Application
# =============================================================================
def run_flask_app():
"""Runs the Flask API server in a separate thread."""
app.run(host="127.0.0.1", port=5000)
if __name__ == '__main__':
# Start the OCR thread
collector_thread = threading.Thread(target=collect_ocr_data, daemon=True)
collector_thread.start()
# Start the Flask API thread
flask_thread = threading.Thread(target=run_flask_app, daemon=True)
flask_thread.start()
# Start PyQt5 GUI
app_gui = QApplication([])
overlay_window = OverlayCanvas()
overlay_window.show()
# Run event loop
app_gui.exec_()