Borealis-Legacy/Modules/data_collector.py

282 lines
9.0 KiB
Python

# Modules/data_collector.py
import threading
import time
import re
import sys
import numpy as np
import cv2
# Vision-related Imports
import pytesseract
import easyocr
import torch
from PIL import Image, ImageGrab, ImageFilter
from PyQt5.QtWidgets import QApplication, QWidget
from PyQt5.QtCore import QRect, QPoint, Qt, QMutex, QTimer
from PyQt5.QtGui import QPainter, QPen, QColor, QFont
# Initialize EasyOCR with CUDA support
reader_cpu = easyocr.Reader(['en'], gpu=False)
reader_gpu = easyocr.Reader(['en'], gpu=True if torch.cuda.is_available() else False)
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
DEFAULT_WIDTH = 180
DEFAULT_HEIGHT = 130
HANDLE_SIZE = 8
LABEL_HEIGHT = 20
collector_mutex = QMutex()
regions = {}
app_instance = None
def _ensure_qapplication():
"""
Ensures that QApplication is initialized before creating widgets.
Must be called from the main thread.
"""
global app_instance
if app_instance is None:
app_instance = QApplication(sys.argv) # Start in main thread
def create_ocr_region(region_id, x=250, y=50, w=DEFAULT_WIDTH, h=DEFAULT_HEIGHT, color=(255, 255, 0), thickness=2):
"""
Creates an OCR region with a visible, resizable box on the screen.
Allows setting custom color (RGB) and line thickness.
"""
_ensure_qapplication()
collector_mutex.lock()
if region_id in regions:
collector_mutex.unlock()
return
regions[region_id] = {
'bbox': [x, y, w, h],
'raw_text': "",
'widget': OCRRegionWidget(x, y, w, h, region_id, color, thickness)
}
collector_mutex.unlock()
def get_raw_text(region_id):
collector_mutex.lock()
if region_id not in regions:
collector_mutex.unlock()
return ""
text = regions[region_id]['raw_text']
collector_mutex.unlock()
return text
def start_collector():
t = threading.Thread(target=_update_ocr_loop, daemon=True)
t.start()
def _update_ocr_loop():
while True:
collector_mutex.lock()
region_ids = list(regions.keys())
collector_mutex.unlock()
for rid in region_ids:
collector_mutex.lock()
bbox = regions[rid]['bbox'][:]
collector_mutex.unlock()
x, y, w, h = bbox
screenshot = ImageGrab.grab(bbox=(x, y, x + w, y + h))
processed = _preprocess_image(screenshot)
raw_text = pytesseract.image_to_string(processed, config='--psm 6 --oem 1')
collector_mutex.lock()
if rid in regions:
regions[rid]['raw_text'] = raw_text
collector_mutex.unlock()
time.sleep(0.7)
def _preprocess_image(image):
gray = image.convert("L")
scaled = gray.resize((gray.width * 3, gray.height * 3))
thresh = scaled.point(lambda p: 255 if p > 200 else 0)
return thresh.filter(ImageFilter.MedianFilter(3))
def find_word_positions(region_id, word, offset_x=0, offset_y=0, margin=5, ocr_engine="CPU"):
"""
Optimized function to detect word positions in an OCR region.
Uses raw screen data without preprocessing for max performance.
Uses Tesseract (CPU) or EasyOCR (GPU) depending on user selection.
"""
collector_mutex.lock()
if region_id not in regions:
collector_mutex.unlock()
return []
bbox = regions[region_id]['bbox']
collector_mutex.unlock()
# Extract OCR region position and size
x, y, w, h = bbox
left, top, right, bottom = x, y, x + w, y + h
if right <= left or bottom <= top:
print(f"[ERROR] Invalid OCR region bounds: {bbox}")
return []
try:
# Capture raw screen image (NO preprocessing)
image = ImageGrab.grab(bbox=(left, top, right, bottom))
# Get original image size
orig_width, orig_height = image.size
word_positions = []
if ocr_engine == "CPU":
# Use Tesseract directly on raw PIL image (no preprocessing)
data = pytesseract.image_to_data(image, config='--psm 6 --oem 1', output_type=pytesseract.Output.DICT)
for i in range(len(data['text'])):
if re.search(rf"\b{word}\b", data['text'][i], re.IGNORECASE):
x_scaled = int(data['left'][i])
y_scaled = int(data['top'][i])
w_scaled = int(data['width'][i])
h_scaled = int(data['height'][i])
word_positions.append((x_scaled + offset_x, y_scaled + offset_y, w_scaled + (margin * 2), h_scaled + (margin * 2)))
else:
# Convert PIL image to NumPy array for EasyOCR
image_np = np.array(image)
# Run GPU OCR
results = reader_gpu.readtext(image_np)
for (bbox, text, _) in results:
if re.search(rf"\b{word}\b", text, re.IGNORECASE):
(x_min, y_min), (x_max, y_max) = bbox[0], bbox[2]
x_scaled = int(x_min)
y_scaled = int(y_min)
w_scaled = int(x_max - x_min)
h_scaled = int(y_max - y_min)
word_positions.append((x_scaled + offset_x, y_scaled + offset_y, w_scaled + (margin * 2), h_scaled + (margin * 2)))
return word_positions
except Exception as e:
print(f"[ERROR] Failed to capture OCR region: {e}")
return []
def draw_identification_boxes(region_id, positions, color=(0, 0, 255), thickness=2):
"""
Draws non-interactive rectangles at specified positions within the given OCR region.
"""
collector_mutex.lock()
if region_id in regions and 'widget' in regions[region_id]:
widget = regions[region_id]['widget']
widget.set_draw_positions(positions, color, thickness)
collector_mutex.unlock()
class OCRRegionWidget(QWidget):
def __init__(self, x, y, w, h, region_id, color, thickness):
super().__init__()
self.setGeometry(x, y, w, h)
self.setWindowFlags(Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint | Qt.Tool)
self.setAttribute(Qt.WA_TranslucentBackground, True)
self.setAttribute(Qt.WA_TransparentForMouseEvents, False)
self.drag_offset = None
self.selected_handle = None
self.region_id = region_id
self.box_color = QColor(*color)
self.line_thickness = thickness
self.draw_positions = []
self.show()
def paintEvent(self, event):
painter = QPainter(self)
pen = QPen(self.box_color)
pen.setWidth(self.line_thickness)
painter.setPen(pen)
# Draw main rectangle
painter.drawRect(0, 0, self.width(), self.height())
# Draw detected word overlays
for x, y, w, h in self.draw_positions:
painter.drawRect(x, y, w, h)
def set_draw_positions(self, positions, color, thickness):
"""
Updates the overlay positions and visual settings.
"""
self.draw_positions = positions
self.box_color = QColor(*color)
self.line_thickness = thickness
self.update()
def _resize_handles(self):
w, h = self.width(), self.height()
return [
QRect(0, 0, HANDLE_SIZE, HANDLE_SIZE), # Top-left
QRect(w - HANDLE_SIZE, h - HANDLE_SIZE, HANDLE_SIZE, HANDLE_SIZE) # Bottom-right
]
def mousePressEvent(self, event):
if event.button() == Qt.LeftButton:
for i, handle in enumerate(self._resize_handles()):
if handle.contains(event.pos()):
self.selected_handle = i
return
self.drag_offset = event.pos()
def mouseMoveEvent(self, event):
if self.selected_handle is not None:
w, h = self.width(), self.height()
if self.selected_handle == 0: # Top-left
new_w = w + (self.x() - event.globalX())
new_h = h + (self.y() - event.globalY())
new_x = event.globalX()
new_y = event.globalY()
if new_w < 20: new_w = 20
if new_h < 20: new_h = 20
self.setGeometry(new_x, new_y, new_w, new_h)
elif self.selected_handle == 1: # Bottom-right
new_w = event.globalX() - self.x()
new_h = event.globalY() - self.y()
if new_w < 20: new_w = 20
if new_h < 20: new_h = 20
self.setGeometry(self.x(), self.y(), new_w, new_h)
collector_mutex.lock()
if self.region_id in regions:
regions[self.region_id]['bbox'] = [self.x(), self.y(), self.width(), self.height()]
collector_mutex.unlock()
self.update()
elif self.drag_offset:
new_x = event.globalX() - self.drag_offset.x()
new_y = event.globalY() - self.drag_offset.y()
self.move(new_x, new_y)
collector_mutex.lock()
if self.region_id in regions:
regions[self.region_id]['bbox'] = [new_x, new_y, self.width(), self.height()]
collector_mutex.unlock()