Borealis-Legacy/Modules/data_collector.py

207 lines
6.9 KiB
Python

# Modules/data_collector.py
import threading
import time
import re
import numpy as np
import cv2
import pytesseract
from PIL import Image, ImageGrab, ImageFilter
from PyQt5.QtWidgets import QWidget, QApplication
from PyQt5.QtCore import QRect, QPoint, Qt, QMutex, QTimer
from PyQt5.QtGui import QPainter, QPen, QColor, QFont
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
DEFAULT_WIDTH = 180
DEFAULT_HEIGHT = 130
HANDLE_SIZE = 8
LABEL_HEIGHT = 20
collector_mutex = QMutex()
regions = {}
overlay_window = None
def create_ocr_region(region_id, x=250, y=50, w=DEFAULT_WIDTH, h=DEFAULT_HEIGHT):
collector_mutex.lock()
if region_id in regions:
collector_mutex.unlock()
return
regions[region_id] = {
'bbox': [x, y, w, h],
'raw_text': ""
}
collector_mutex.unlock()
_ensure_overlay()
def get_raw_text(region_id):
collector_mutex.lock()
if region_id not in regions:
collector_mutex.unlock()
return ""
text = regions[region_id]['raw_text']
collector_mutex.unlock()
return text
def start_collector():
t = threading.Thread(target=_update_ocr_loop, daemon=True)
t.start()
def _update_ocr_loop():
while True:
collector_mutex.lock()
region_ids = list(regions.keys())
collector_mutex.unlock()
for rid in region_ids:
collector_mutex.lock()
bbox = regions[rid]['bbox'][:]
collector_mutex.unlock()
x, y, w, h = bbox
screenshot = ImageGrab.grab(bbox=(x, y, x + w, y + h))
processed = _preprocess_image(screenshot)
raw_text = pytesseract.image_to_string(processed, config='--psm 4 --oem 1')
collector_mutex.lock()
if rid in regions:
regions[rid]['raw_text'] = raw_text
collector_mutex.unlock()
time.sleep(0.7)
def _preprocess_image(image):
gray = image.convert("L")
scaled = gray.resize((gray.width * 3, gray.height * 3))
thresh = scaled.point(lambda p: 255 if p > 200 else 0)
return thresh.filter(ImageFilter.MedianFilter(3))
def _ensure_overlay():
"""
Creates the overlay window if none exists.
If no QApplication instance is running yet, schedule the creation after
the main application event loop starts (to avoid "Must construct a QApplication first" errors).
"""
global overlay_window
if overlay_window is not None:
return
# If there's already a running QApplication, create overlay immediately.
if QApplication.instance() is not None:
overlay_window = OverlayCanvas()
overlay_window.show()
else:
# Schedule creation for when the app event loop is up.
def delayed_create():
global overlay_window
if overlay_window is None:
overlay_window = OverlayCanvas()
overlay_window.show()
QTimer.singleShot(0, delayed_create)
class OverlayCanvas(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
screen_geo = QApplication.primaryScreen().geometry()
self.setGeometry(screen_geo)
self.setWindowFlags(Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint)
self.setAttribute(Qt.WA_TranslucentBackground, True)
self.drag_offset = None
self.selected_handle = None
self.selected_region_id = None
def paintEvent(self, event):
painter = QPainter(self)
pen = QPen(QColor(0, 0, 255))
pen.setWidth(5)
painter.setPen(pen)
collector_mutex.lock()
region_copy = {rid: data['bbox'][:] for rid, data in regions.items()}
collector_mutex.unlock()
for rid, bbox in region_copy.items():
x, y, w, h = bbox
painter.drawRect(x, y, w, h)
painter.setFont(QFont("Arial", 12, QFont.Bold))
painter.setPen(QColor(0, 0, 255))
painter.drawText(x, y - 5, f"OCR Region: {rid}")
def mousePressEvent(self, event):
if event.button() == Qt.LeftButton:
collector_mutex.lock()
all_items = list(regions.items())
collector_mutex.unlock()
for rid, data in all_items:
x, y, w, h = data['bbox']
handles = self._resize_handles(x, y, w, h)
for i, handle_rect in enumerate(handles):
if handle_rect.contains(event.pos()):
self.selected_handle = i
self.selected_region_id = rid
return
if QRect(x, y, w, h).contains(event.pos()):
self.drag_offset = event.pos() - QPoint(x, y)
self.selected_region_id = rid
return
def mouseMoveEvent(self, event):
if not self.selected_region_id:
return
collector_mutex.lock()
if self.selected_region_id not in regions:
collector_mutex.unlock()
return
bbox = regions[self.selected_region_id]['bbox']
collector_mutex.unlock()
x, y, w, h = bbox
if self.selected_handle is not None:
# resizing
if self.selected_handle == 0: # top-left
new_w = w + (x - event.x())
new_h = h + (y - event.y())
new_x = event.x()
new_y = event.y()
if new_w < 10: new_w = 10
if new_h < 10: new_h = 10
collector_mutex.lock()
if self.selected_region_id in regions:
regions[self.selected_region_id]['bbox'] = [new_x, new_y, new_w, new_h]
collector_mutex.unlock()
elif self.selected_handle == 1: # bottom-right
new_w = event.x() - x
new_h = event.y() - y
if new_w < 10: new_w = 10
if new_h < 10: new_h = 10
collector_mutex.lock()
if self.selected_region_id in regions:
regions[self.selected_region_id]['bbox'] = [x, y, new_w, new_h]
collector_mutex.unlock()
self.update()
elif self.drag_offset:
# dragging
new_x = event.x() - self.drag_offset.x()
new_y = event.y() - self.drag_offset.y()
collector_mutex.lock()
if self.selected_region_id in regions:
regions[self.selected_region_id]['bbox'][0] = new_x
regions[self.selected_region_id]['bbox'][1] = new_y
collector_mutex.unlock()
self.update()
def mouseReleaseEvent(self, event):
self.selected_handle = None
self.drag_offset = None
self.selected_region_id = None
def _resize_handles(self, x, y, w, h):
return [
QRect(x - HANDLE_SIZE//2, y - HANDLE_SIZE//2, HANDLE_SIZE, HANDLE_SIZE), # top-left
QRect(x + w - HANDLE_SIZE//2, y + h - HANDLE_SIZE//2, HANDLE_SIZE, HANDLE_SIZE) # bottom-right
]