# Modules/data_collector.py import threading import time import re import sys import numpy as np import cv2 import concurrent.futures # Vision-related Imports import pytesseract import easyocr import torch from PIL import Image, ImageGrab, ImageFilter from PyQt5.QtWidgets import QApplication, QWidget from PyQt5.QtCore import QRect, QPoint, Qt, QMutex, QTimer from PyQt5.QtGui import QPainter, QPen, QColor, QFont # Initialize EasyOCR with CUDA support reader_cpu = easyocr.Reader(['en'], gpu=False) reader_gpu = easyocr.Reader(['en'], gpu=True if torch.cuda.is_available() else False) pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" DEFAULT_WIDTH = 180 DEFAULT_HEIGHT = 130 HANDLE_SIZE = 8 LABEL_HEIGHT = 20 collector_mutex = QMutex() regions = {} app_instance = None def _ensure_qapplication(): """ Ensures that QApplication is initialized before creating widgets. Must be called from the main thread. """ global app_instance if app_instance is None: app_instance = QApplication(sys.argv) # Start in main thread def capture_region_as_image(region_id): collector_mutex.lock() if region_id not in regions: collector_mutex.unlock() return None x, y, w, h = regions[region_id]['bbox'][:] collector_mutex.unlock() screenshot = ImageGrab.grab(bbox=(x, y, x + w, y + h)) return screenshot def create_ocr_region(region_id, x=250, y=50, w=DEFAULT_WIDTH, h=DEFAULT_HEIGHT, color=(255, 255, 0), thickness=2): """ Creates an OCR region with a visible, resizable box on the screen. Allows setting custom color (RGB) and line thickness. """ _ensure_qapplication() collector_mutex.lock() if region_id in regions: collector_mutex.unlock() return regions[region_id] = { 'bbox': [x, y, w, h], 'raw_text': "", 'widget': OCRRegionWidget(x, y, w, h, region_id, color, thickness) } collector_mutex.unlock() def get_raw_text(region_id): collector_mutex.lock() if region_id not in regions: collector_mutex.unlock() return "" text = regions[region_id]['raw_text'] collector_mutex.unlock() return text def start_collector(): t = threading.Thread(target=_update_ocr_loop, daemon=True) t.start() def _update_ocr_loop(): while True: collector_mutex.lock() region_ids = list(regions.keys()) collector_mutex.unlock() for rid in region_ids: collector_mutex.lock() bbox = regions[rid]['bbox'][:] collector_mutex.unlock() x, y, w, h = bbox screenshot = ImageGrab.grab(bbox=(x, y, x + w, y + h)) processed = _preprocess_image(screenshot) raw_text = pytesseract.image_to_string(processed, config='--psm 6 --oem 1') collector_mutex.lock() if rid in regions: regions[rid]['raw_text'] = raw_text collector_mutex.unlock() time.sleep(0.7) def _preprocess_image(image): gray = image.convert("L") scaled = gray.resize((gray.width * 3, gray.height * 3)) thresh = scaled.point(lambda p: 255 if p > 200 else 0) return thresh.filter(ImageFilter.MedianFilter(3)) def find_word_positions(region_id, word, offset_x=0, offset_y=0, margin=5, ocr_engine="CPU", num_slices=1): """ Uses user-defined horizontal slices and threading for faster inference. """ collector_mutex.lock() if region_id not in regions: collector_mutex.unlock() return [] bbox = regions[region_id]['bbox'] collector_mutex.unlock() x, y, w, h = bbox left, top, right, bottom = x, y, x + w, y + h if right <= left or bottom <= top: print(f"[ERROR] Invalid OCR region bounds: {bbox}") return [] try: image = ImageGrab.grab(bbox=(left, top, right, bottom)) orig_width, orig_height = image.size word_positions = [] # Ensure number of slices does not exceed image height num_slices = min(num_slices, orig_height) strip_height = max(1, orig_height // num_slices) def process_strip(strip_id): strip_y = strip_id * strip_height strip = image.crop((0, strip_y, orig_width, strip_y + strip_height)) strip_np = np.array(strip) detected_positions = [] if ocr_engine == "CPU": ocr_data = pytesseract.image_to_data(strip, config='--psm 6 --oem 1', output_type=pytesseract.Output.DICT) for i in range(len(ocr_data['text'])): if re.search(rf"\b{word}\b", ocr_data['text'][i], re.IGNORECASE): x_scaled = int(ocr_data['left'][i]) y_scaled = int(ocr_data['top'][i]) + strip_y w_scaled = int(ocr_data['width'][i]) h_scaled = int(ocr_data['height'][i]) detected_positions.append((x_scaled + offset_x, y_scaled + offset_y, w_scaled + (margin * 2), h_scaled + (margin * 2))) else: results = reader_gpu.readtext(strip_np) for (bbox, text, _) in results: if re.search(rf"\b{word}\b", text, re.IGNORECASE): (x_min, y_min), (x_max, y_max) = bbox[0], bbox[2] x_scaled = int(x_min) y_scaled = int(y_min) + strip_y w_scaled = int(x_max - x_min) h_scaled = int(y_max - y_min) detected_positions.append((x_scaled + offset_x, y_scaled + offset_y, w_scaled + (margin * 2), h_scaled + (margin * 2))) return detected_positions with concurrent.futures.ThreadPoolExecutor(max_workers=num_slices) as executor: strip_results = list(executor.map(process_strip, range(num_slices))) for strip_result in strip_results: word_positions.extend(strip_result) return word_positions except Exception as e: print(f"[ERROR] Failed to capture OCR region: {e}") return [] def draw_identification_boxes(region_id, positions, color=(0, 0, 255), thickness=2): """ Draws non-interactive rectangles at specified positions within the given OCR region. Uses a separate rendering thread to prevent blocking OCR processing. """ collector_mutex.lock() if region_id in regions and 'widget' in regions[region_id]: widget = regions[region_id]['widget'] widget.update_draw_positions(positions, color, thickness) collector_mutex.unlock() def update_region_slices(region_id, num_slices): """ Updates the number of visual slices in the OCR region. """ collector_mutex.lock() if region_id in regions and 'widget' in regions[region_id]: widget = regions[region_id]['widget'] widget.set_num_slices(num_slices) collector_mutex.unlock() class OCRRegionWidget(QWidget): def __init__(self, x, y, w, h, region_id, color, thickness): super().__init__() self.setGeometry(x, y, w, h) self.setWindowFlags(Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint | Qt.Tool) self.setAttribute(Qt.WA_TranslucentBackground, True) self.setAttribute(Qt.WA_TransparentForMouseEvents, False) self.region_id = region_id self.box_color = QColor(*color) self.line_thickness = thickness self.draw_positions = [] self.previous_positions = [] # This prevents redundant redraws self.num_slices = 1 # Ensures slice count is initialized self.show() def paintEvent(self, event): painter = QPainter(self) pen = QPen(self.box_color) pen.setWidth(self.line_thickness) painter.setPen(pen) # Draw main rectangle painter.drawRect(0, 0, self.width(), self.height()) # Draw detected word overlays for x, y, w, h in self.draw_positions: painter.drawRect(x, y, w, h) # Draw faint slice division lines if self.num_slices > 1: strip_height = self.height() // self.num_slices pen.setColor(QColor(150, 150, 150, 100)) # Light gray, semi-transparent pen.setWidth(1) painter.setPen(pen) for i in range(1, self.num_slices): # Do not draw the last one at the bottom painter.drawLine(0, i * strip_height, self.width(), i * strip_height) def set_draw_positions(self, positions, color, thickness): """ Updates the overlay positions and visual settings. """ self.draw_positions = positions self.box_color = QColor(*color) self.line_thickness = thickness self.update() def update_draw_positions(self, positions, color, thickness): """ Updates the overlay positions and redraws only if the positions have changed. This prevents unnecessary flickering. """ if positions == self.previous_positions: return # No change, do not update self.previous_positions = positions # Store last known positions self.draw_positions = positions self.box_color = QColor(*color) self.line_thickness = thickness self.update() # Redraw only if needed def set_num_slices(self, num_slices): """ Updates the number of horizontal slices for visualization. """ self.num_slices = num_slices self.update() def _resize_handles(self): w, h = self.width(), self.height() return [ QRect(0, 0, HANDLE_SIZE, HANDLE_SIZE), # Top-left QRect(w - HANDLE_SIZE, h - HANDLE_SIZE, HANDLE_SIZE, HANDLE_SIZE) # Bottom-right ] def mousePressEvent(self, event): if event.button() == Qt.LeftButton: for i, handle in enumerate(self._resize_handles()): if handle.contains(event.pos()): self.selected_handle = i return self.drag_offset = event.pos() def mouseMoveEvent(self, event): if self.selected_handle is not None: w, h = self.width(), self.height() if self.selected_handle == 0: # Top-left new_w = w + (self.x() - event.globalX()) new_h = h + (self.y() - event.globalY()) new_x = event.globalX() new_y = event.globalY() if new_w < 20: new_w = 20 if new_h < 20: new_h = 20 self.setGeometry(new_x, new_y, new_w, new_h) elif self.selected_handle == 1: # Bottom-right new_w = event.globalX() - self.x() new_h = event.globalY() - self.y() if new_w < 20: new_w = 20 if new_h < 20: new_h = 20 self.setGeometry(self.x(), self.y(), new_w, new_h) collector_mutex.lock() if self.region_id in regions: regions[self.region_id]["bbox"] = [self.x(), self.y(), self.width(), self.height()] collector_mutex.unlock() self.update() elif self.drag_offset: new_x = event.globalX() - self.drag_offset.x() new_y = event.globalY() - self.drag_offset.y() self.move(new_x, new_y) collector_mutex.lock() if self.region_id in regions: regions[self.region_id]["bbox"] = [new_x, new_y, self.width(), self.height()] collector_mutex.unlock()