Restructured project and implemented virtual python environments to isolate application. Added launch scripts too.

This commit is contained in:
2025-02-27 19:38:25 -07:00
parent a2c0080662
commit 17b99ca836
76 changed files with 264 additions and 5 deletions

View File

@ -0,0 +1,49 @@
from OdenGraphQt import BaseNode
class ArrayNode(BaseNode):
"""
Array Node:
- Inputs: 'in' (value to store), 'ArraySize' (defines maximum length)
- Output: 'Array' (the current array as a string)
- Stores incoming values in an array with a size defined by ArraySize.
- Updates are now handled via a global update timer.
"""
__identifier__ = 'bunny-lab.io.array_node'
NODE_NAME = 'Array'
def __init__(self):
super(ArrayNode, self).__init__()
self.values = {} # Ensure values is a dictionary.
self.add_input('in')
self.add_input('ArraySize')
self.add_output('Array')
self.array = []
self.value = "[]" # Output as a string.
self.array_size = 10 # Default array size.
self.set_name("Array: []")
def process_input(self):
# Get array size from 'ArraySize' input if available.
size_port = self.input('ArraySize')
connected_size = size_port.connected_ports() if size_port is not None else []
if connected_size:
connected_port = connected_size[0]
parent_node = connected_port.node()
try:
self.array_size = int(float(getattr(parent_node, 'value', 10)))
except (ValueError, TypeError):
self.array_size = 10
# Get new value from 'in' input if available.
in_port = self.input('in')
connected_in = in_port.connected_ports() if in_port is not None else []
if connected_in:
connected_port = connected_in[0]
parent_node = connected_port.node()
new_value = getattr(parent_node, 'value', None)
if new_value is not None:
self.array.append(new_value)
while len(self.array) > self.array_size:
self.array.pop(0)
self.value = str(self.array)
self.set_name(f"Array: {self.value}")

View File

@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
Standardized Comparison Node:
- Compares two input values using a selected operator (==, !=, >, <, >=, <=).
- Outputs a result of 1 (True) or 0 (False).
- Uses a global update timer for processing.
- Supports an additional 'Input Type' dropdown to choose between 'Number' and 'String'.
"""
from OdenGraphQt import BaseNode
from Qt import QtCore
class ComparisonNode(BaseNode):
__identifier__ = 'bunny-lab.io.comparison_node'
NODE_NAME = 'Comparison Node'
def __init__(self):
super(ComparisonNode, self).__init__()
self.add_input('A')
self.add_input('B')
self.add_output('Result')
# Add the Input Type dropdown first.
self.add_combo_menu('input_type', 'Input Type', items=['Number', 'String'])
self.add_combo_menu('operator', 'Operator', items=[
'Equal (==)', 'Not Equal (!=)', 'Greater Than (>)',
'Less Than (<)', 'Greater Than or Equal (>=)', 'Less Than or Equal (<=)'
])
# Replace calc_result with a standardized "value" text input.
self.add_text_input('value', 'Value', text='0')
self.value = 0
self.set_name("Comparison Node")
self.processing = False # Guard for process_input
# Set default properties explicitly
self.set_property('input_type', 'Number')
self.set_property('operator', 'Equal (==)')
def process_input(self):
if self.processing:
return
self.processing = True
# Retrieve input values; if no connection or None, default to "0"
input_a = self.input(0)
input_b = self.input(1)
a_raw = (input_a.connected_ports()[0].node().get_property('value')
if input_a.connected_ports() else "0")
b_raw = (input_b.connected_ports()[0].node().get_property('value')
if input_b.connected_ports() else "0")
a_raw = a_raw if a_raw is not None else "0"
b_raw = b_raw if b_raw is not None else "0"
# Get input type property
input_type = self.get_property('input_type')
# Convert values based on input type
if input_type == 'Number':
try:
a_val = float(a_raw)
except (ValueError, TypeError):
a_val = 0.0
try:
b_val = float(b_raw)
except (ValueError, TypeError):
b_val = 0.0
elif input_type == 'String':
a_val = str(a_raw)
b_val = str(b_raw)
else:
try:
a_val = float(a_raw)
except (ValueError, TypeError):
a_val = 0.0
try:
b_val = float(b_raw)
except (ValueError, TypeError):
b_val = 0.0
operator = self.get_property('operator')
# Perform the comparison
result = {
'Equal (==)': a_val == b_val,
'Not Equal (!=)': a_val != b_val,
'Greater Than (>)': a_val > b_val,
'Less Than (<)': a_val < b_val,
'Greater Than or Equal (>=)': a_val >= b_val,
'Less Than or Equal (<=)': a_val <= b_val
}.get(operator, False)
new_value = 1 if result else 0
self.value = new_value
self.set_property('value', str(self.value))
self.transmit_data(self.value)
self.processing = False
def on_input_connected(self, input_port, output_port):
pass
def on_input_disconnected(self, input_port, output_port):
pass
def property_changed(self, property_name):
pass
def receive_data(self, data, source_port_name=None):
pass
def transmit_data(self, data):
output_port = self.output(0)
if output_port and output_port.connected_ports():
for connected_port in output_port.connected_ports():
connected_node = connected_port.node()
if hasattr(connected_node, 'receive_data'):
try:
data_int = int(data)
connected_node.receive_data(data_int, source_port_name='Result')
except ValueError:
pass

View File

@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""
Standardized Data Node:
- Accepts and transmits values consistently.
- Updates its value based on a global update timer.
"""
from OdenGraphQt import BaseNode
from Qt import QtCore
class DataNode(BaseNode):
__identifier__ = 'bunny-lab.io.data_node'
NODE_NAME = 'Data Node'
def __init__(self):
super(DataNode, self).__init__()
self.add_input('Input')
self.add_output('Output')
self.add_text_input('value', 'Value', text='')
self.process_widget_event()
self.set_name("Data Node")
# Removed self-contained update timer; global timer now drives updates.
def post_create(self):
text_widget = self.get_widget('value')
if text_widget is not None:
try:
# Removed textChanged signal connection; global timer will call process_input.
pass
except Exception as e:
print("Error connecting textChanged signal:", e)
def process_widget_event(self, event=None):
current_text = self.get_property('value')
self.value = current_text
self.transmit_data(current_text)
def property_changed(self, property_name):
if property_name == 'value':
# Immediate update removed; relying on global timer.
pass
def process_input(self):
input_port = self.input(0)
output_port = self.output(0)
if input_port.connected_ports():
input_value = input_port.connected_ports()[0].node().get_property('value')
self.set_property('value', input_value)
self.transmit_data(input_value)
elif output_port.connected_ports():
self.transmit_data(self.get_property('value'))
def on_input_connected(self, input_port, output_port):
# Removed immediate update; global timer handles updates.
pass
def on_input_disconnected(self, input_port, output_port):
# Removed immediate update; global timer handles updates.
pass
def receive_data(self, data, source_port_name=None):
self.set_property('value', str(data))
self.transmit_data(data)
def transmit_data(self, data):
output_port = self.output(0)
if output_port and output_port.connected_ports():
for connected_port in output_port.connected_ports():
connected_node = connected_port.node()
if hasattr(connected_node, 'receive_data'):
connected_node.receive_data(data, source_port_name="Output")

View File

@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""
Identification Overlay Node:
- Users can configure threads/slices for parallel processing.
"""
import re
from OdenGraphQt import BaseNode
from PyQt5.QtCore import QTimer
from PyQt5.QtGui import QColor
from Modules import data_collector
class IdentificationOverlayNode(BaseNode):
__identifier__ = "bunny-lab.io.identification_overlay_node"
NODE_NAME = "Identification Overlay"
def __init__(self):
super(IdentificationOverlayNode, self).__init__()
# User-configurable options
self.add_text_input("search_term", "Search Term", text="Aibatt")
self.add_text_input("offset_value", "Offset Value (X,Y)", text="0,0") # X,Y Offset
self.add_text_input("margin", "Margin", text="5") # Box Margin
self.add_text_input("polling_freq", "Polling Frequency (ms)", text="500") # Polling Rate
self.add_combo_menu("ocr_engine", "Type", items=["CPU", "GPU"])
self.set_property("ocr_engine", "CPU") # Default to CPU mode
# Custom overlay options
self.add_text_input("overlay_color", "Overlay Color (RGB)", text="0,0,255") # Default blue
self.add_text_input("thickness", "Line Thickness", text="2") # Default 2px
self.add_text_input("threads_slices", "Threads / Slices", text="8") # Default 8 threads/slices
self.region_id = "identification_overlay"
data_collector.create_ocr_region(self.region_id, x=250, y=50, w=300, h=200, color=(0, 0, 255), thickness=2)
data_collector.start_collector()
self.set_name("Identification Overlay")
# Timer for updating overlays
self.timer = QTimer()
self.timer.timeout.connect(self.update_overlay)
# Set initial polling frequency
self.update_polling_frequency()
def update_polling_frequency(self):
polling_text = self.get_property("polling_freq")
try:
polling_interval = max(50, int(polling_text))
except ValueError:
polling_interval = 500
self.timer.start(polling_interval)
def update_overlay(self):
search_term = self.get_property("search_term")
offset_text = self.get_property("offset_value")
margin_text = self.get_property("margin")
ocr_engine = self.get_property("ocr_engine")
threads_slices_text = self.get_property("threads_slices")
self.update_polling_frequency()
try:
offset_x, offset_y = map(int, offset_text.split(","))
except ValueError:
offset_x, offset_y = 0, 0
try:
margin = int(margin_text)
except ValueError:
margin = 5
color_text = self.get_property("overlay_color")
try:
color = tuple(map(int, color_text.split(",")))
except ValueError:
color = (0, 0, 255)
thickness_text = self.get_property("thickness")
try:
thickness = max(1, int(thickness_text))
except ValueError:
thickness = 2
try:
num_slices = max(1, int(threads_slices_text)) # Ensure at least 1 slice
except ValueError:
num_slices = 1
if not search_term:
return
detected_positions = data_collector.find_word_positions(
self.region_id, search_term, offset_x, offset_y, margin, ocr_engine, num_slices
)
# Ensure slice count is updated visually in the region widget
data_collector.update_region_slices(self.region_id, num_slices)
data_collector.draw_identification_boxes(self.region_id, detected_positions, color=color, thickness=thickness)

View File

@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
Standardized Math Operation Node:
- Performs mathematical operations (+, -, *, /, avg) on two inputs.
- Outputs the computed result.
- Uses a global update timer for processing (defined in borealis.py).
- Ensures it always has a "value" property that the Comparison Node can read.
"""
from OdenGraphQt import BaseNode
from Qt import QtCore
class MathOperationNode(BaseNode):
__identifier__ = 'bunny-lab.io.math_node'
NODE_NAME = 'Math Operation'
def __init__(self):
super(MathOperationNode, self).__init__()
self.add_input('A')
self.add_input('B')
self.add_output('Result')
# Drop-down to choose which operation we do:
self.add_combo_menu('operator', 'Operator', items=[
'Add', 'Subtract', 'Multiply', 'Divide', 'Average'
])
# A text field for showing the result to the user:
self.add_text_input('calc_result', 'Result', text='0')
# IMPORTANT: define a "value" property that the Comparison Node can read
# We do not necessarily need a text input for it, but adding it ensures
# it becomes an official property recognized by OdenGraphQt.
self.add_text_input('value', 'Internal Value', text='0')
# Keep a Python-side float of the current computed result:
self.value = 0
# Give the node a nice name:
self.set_name("Math Operation")
# Removed self-contained timer; global timer calls process_input().
def process_input(self):
# Attempt to read "value" from both inputs:
input_a = self.input(0)
input_b = self.input(1)
a_raw = input_a.connected_ports()[0].node().get_property('value') if input_a.connected_ports() else "0"
b_raw = input_b.connected_ports()[0].node().get_property('value') if input_b.connected_ports() else "0"
try:
a_val = float(a_raw)
except (ValueError, TypeError):
a_val = 0.0
try:
b_val = float(b_raw)
except (ValueError, TypeError):
b_val = 0.0
operator = self.get_property('operator')
if operator == 'Add':
result = a_val + b_val
elif operator == 'Subtract':
result = a_val - b_val
elif operator == 'Multiply':
result = a_val * b_val
elif operator == 'Divide':
result = a_val / b_val if b_val != 0 else 0.0
elif operator == 'Average':
result = (a_val + b_val) / 2.0
else:
result = 0.0
# If the computed result changed, update our internal properties and transmit
if self.value != result:
self.value = result
# Update the two text fields so the user sees the numeric result:
self.set_property('calc_result', str(result))
self.set_property('value', str(result)) # <= This is the critical step
# Let downstream nodes know there's new data:
self.transmit_data(result)
def on_input_connected(self, input_port, output_port):
pass
def on_input_disconnected(self, input_port, output_port):
pass
def property_changed(self, property_name):
pass
def receive_data(self, data, source_port_name=None):
pass
def transmit_data(self, data):
output_port = self.output(0)
if output_port and output_port.connected_ports():
for connected_port in output_port.connected_ports():
connected_node = connected_port.node()
if hasattr(connected_node, 'receive_data'):
try:
# Attempt to convert to int if possible, else float
data_int = int(data)
connected_node.receive_data(data_int, source_port_name='Result')
except ValueError:
connected_node.receive_data(data, source_port_name='Result')