Milestone Fixed Character Node

This commit is contained in:
Nicole Rappe 2025-02-12 00:53:42 -07:00
parent 78e763c05c
commit 75f68b1f59
5 changed files with 176 additions and 52 deletions

View File

@ -2,8 +2,8 @@
"""
Character Status Node
This node represents the character's status. It has no input ports and four output ports:
- HP, MP, FP, EXP.
This node represents the character's status. It has seven output ports:
- HP: Current, HP: Total, MP: Current, MP: Total, FP: Current, FP: Total, EXP.
It polls an API endpoint (http://127.0.0.1:5000/data) every 500 ms to update its values.
If the API call is successful, the node's title is set to "Character Status (API Connected)".
If the API is down or returns an error, the title is set to "Character Status (API Disconnected)".
@ -12,6 +12,7 @@ If the API is down or returns an error, the title is set to "Character Status (A
from NodeGraphQt import BaseNode
from Qt import QtCore, QtGui
import requests
import traceback
def get_draw_stat_port(color, border_color=None, alpha=127):
"""
@ -24,19 +25,16 @@ def get_draw_stat_port(color, border_color=None, alpha=127):
def painter_func(painter, rect, info):
painter.save()
# Draw the port circle.
pen = QtGui.QPen(QtGui.QColor(*border_color))
pen.setWidth(1.8)
painter.setPen(pen)
semi_transparent_color = QtGui.QColor(color[0], color[1], color[2], alpha)
painter.setBrush(semi_transparent_color)
painter.drawEllipse(rect)
# Draw the label and current value.
port = info.get('port')
if port is not None:
node = port.node()
stat = port.name()
# Use the node's 'values' dictionary if available.
value = node.values.get(stat, "N/A") if hasattr(node, 'values') else "N/A"
text_rect = rect.adjusted(rect.width() + 4, 0, rect.width() + 70, 0)
painter.setPen(QtGui.QColor(0, 0, 0))
@ -51,16 +49,27 @@ class CharacterStatusNode(BaseNode):
def __init__(self):
super(CharacterStatusNode, self).__init__()
# Initialize the output values as a dictionary.
self.values = {"HP": "N/A", "MP": "N/A", "FP": "N/A", "EXP": "N/A"}
# Add output ports for each stat with custom painters.
self.add_output("HP", painter_func=get_draw_stat_port((255, 0, 0))) # Red for HP
self.add_output("MP", painter_func=get_draw_stat_port((0, 0, 255))) # Blue for MP
self.add_output("FP", painter_func=get_draw_stat_port((0, 255, 0))) # Green for FP
self.add_output("EXP", painter_func=get_draw_stat_port((127, 255, 212))) # Aquamarine for EXP
# Set an initial title.
# Define exact expected keys to avoid transformation mismatches
self.values = {
"HP: Current": "N/A", "HP: Total": "N/A",
"MP: Current": "N/A", "MP: Total": "N/A",
"FP: Current": "N/A", "FP: Total": "N/A",
"EXP": "N/A"
}
# Add output ports
self.add_output("HP: Current", painter_func=get_draw_stat_port((255, 0, 0)))
self.add_output("HP: Total", painter_func=get_draw_stat_port((255, 0, 0)))
self.add_output("MP: Current", painter_func=get_draw_stat_port((0, 0, 255)))
self.add_output("MP: Total", painter_func=get_draw_stat_port((0, 0, 255)))
self.add_output("FP: Current", painter_func=get_draw_stat_port((0, 255, 0)))
self.add_output("FP: Total", painter_func=get_draw_stat_port((0, 255, 0)))
self.add_output("EXP", painter_func=get_draw_stat_port((127, 255, 212)))
self.set_name("Character Status (API Disconnected)")
# Create a QTimer that polls the API every 500ms.
# Start polling timer
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.poll_api)
self.timer.start(500)
@ -68,22 +77,77 @@ class CharacterStatusNode(BaseNode):
def poll_api(self):
"""
Polls the API endpoint to retrieve the latest character stats and updates
the node's internal values. Expects a JSON response with keys:
- "hp", "mp", "fp", "exp"
the node's internal values.
"""
try:
response = requests.get("http://127.0.0.1:5000/data", timeout=1)
if response.status_code == 200:
data = response.json()
# Update the values dictionary.
self.values["HP"] = data.get("hp", "0/0")
self.values["MP"] = data.get("mp", "0/0")
self.values["FP"] = data.get("fp", "0/0")
self.values["EXP"] = data.get("exp", "0.0000")
self.set_name("Character Status (API Connected)")
self.update()
if isinstance(data, dict):
try:
for key, value in data.items():
# Ensure the keys match the expected ones exactly
formatted_key = {
"hp_current": "HP: Current",
"hp_total": "HP: Total",
"mp_current": "MP: Current",
"mp_total": "MP: Total",
"fp_current": "FP: Current",
"fp_total": "FP: Total",
"exp": "EXP"
}.get(key, key) # Use mapping or fallback to raw key
if formatted_key in self.values:
self.values[formatted_key] = str(value)
else:
print(f"[WARNING] Unexpected API key: {key} (not mapped)")
self.set_name("Character Status (API Connected)")
self.update()
self.transmit_data()
except Exception as e:
print("[ERROR] Error processing API response data:", e)
print("[ERROR] Stack Trace:\n", traceback.format_exc())
else:
print("[ERROR] Unexpected API response format (not a dict):", data)
self.set_name("Character Status (API Disconnected)")
else:
print(f"[ERROR] API request failed with status code {response.status_code}")
self.set_name("Character Status (API Disconnected)")
except Exception as e:
self.set_name("Character Status (API Disconnected)")
print("Error polling API in CharacterStatusNode:", e)
print("[ERROR] Error polling API in CharacterStatusNode:", str(e))
print("[ERROR] Stack Trace:\n", traceback.format_exc())
def transmit_data(self):
"""
Sends the updated character stats to connected nodes.
"""
for stat, value in self.values.items():
try:
port = self.get_output(stat)
if port is None:
print(f"[ERROR] Port '{stat}' not found in node outputs. Skipping...")
continue
if port.connected_ports():
for connected_port in port.connected_ports():
connected_node = connected_port.node()
if hasattr(connected_node, 'receive_data'):
try:
connected_node.receive_data(value, stat)
except Exception as e:
print(f"[ERROR] Error transmitting data to {connected_node}: {e}")
print("[ERROR] Stack Trace:\n", traceback.format_exc())
else:
print(f"[WARNING] Connected node {connected_node} does not have receive_data method.")
except Exception as e:
print(f"[ERROR] Error while handling port {stat}: {e}")
print("[ERROR] Stack Trace:\n", traceback.format_exc())

View File

@ -1,3 +1,20 @@
#!/usr/bin/env python3
"""
Data Node:
- Input: Accepts a value (string, integer, or float) from an input port.
- Output: Outputs the current value, either from the input port or set manually via a text box.
Behavior:
- If both input and output are connected:
- Acts as a passthrough, displaying the input value and transmitting it to the output.
- Manual input is disabled.
- If only the output is connected:
- Allows manual value entry, which is sent to the output.
- If only the input is connected:
- Displays the input value but does not transmit it further.
"""
from NodeGraphQt import BaseNode
class DataNode(BaseNode):
@ -6,13 +23,20 @@ class DataNode(BaseNode):
def __init__(self):
super(DataNode, self).__init__()
# Add input and output ports.
self.add_input('Input')
self.add_output('Output')
# Add a text input widget for manual entry.
self.add_text_input('value', 'Value', text='')
self.manual_input_enabled = True
self.set_name(f"Data Node: {self.get_property('value')}")
# Initialize the value from the widget property.
self.process_widget_event()
self.set_name(f"Data Node: {self.value}")
def post_create(self):
"""
Called after the node's widget is fully created.
Connect the text input widget's textChanged signal to process_widget_event.
"""
text_widget = self.get_widget('value')
if text_widget is not None:
try:
@ -21,55 +45,82 @@ class DataNode(BaseNode):
print("Error connecting textChanged signal:", e)
def process_widget_event(self, event=None):
if self.manual_input_enabled:
current_text = self.get_property('value')
self.set_name(f"Data Node: {current_text}")
self.transmit_data(current_text)
"""
Reads the current text from the node's property and updates the node's internal value.
"""
current_text = self.get_property('value')
self.value = current_text
self.set_name(f"Data Node: {self.value}")
def property_changed(self, property_name):
"""
Called when a node property changes. If the 'value' property changes,
update the internal value.
"""
if property_name == 'value':
self.process_widget_event()
def update_stream(self):
"""
Updates the node's behavior based on the connection states.
"""
input_port = self.input(0)
output_port = self.output(0)
if input_port.connected_ports() and output_port.connected_ports():
self.manual_input_enabled = False
# Both input and output are connected; act as passthrough.
self.set_property('value', '')
self.get_widget('value').setEnabled(False)
input_value = input_port.connected_ports()[0].node().get_property('value')
self.set_property('value', input_value)
elif output_port.connected_ports():
self.manual_input_enabled = True
# Only output is connected; allow manual input.
self.get_widget('value').setEnabled(True)
self.transmit_data(self.get_property('value'))
elif input_port.connected_ports():
self.manual_input_enabled = False
# Only input is connected; display input value.
self.get_widget('value').setEnabled(False)
input_value = input_port.connected_ports()[0].node().get_property('value')
self.set_property('value', input_value)
else:
self.manual_input_enabled = True
# Neither input nor output is connected; allow manual input.
self.get_widget('value').setEnabled(True)
def on_input_connected(self, input_port, output_port):
"""
Called when an input port is connected.
"""
self.update_stream()
def on_input_disconnected(self, input_port, output_port):
"""
Called when an input port is disconnected.
"""
self.update_stream()
def on_output_connected(self, output_port, input_port):
"""
Called when an output port is connected.
"""
self.update_stream()
def on_output_disconnected(self, output_port, input_port):
"""
Called when an output port is disconnected.
"""
self.update_stream()
def receive_data(self, data, source_port_name=None):
if not self.manual_input_enabled:
self.set_property('value', data)
self.set_name(f"Data Node: {data}")
self.transmit_data(data)
"""
Receives data from connected nodes and updates the internal value.
"""
print(f"DataNode received data from {source_port_name}: {data}") # Debugging
self.set_property('value', str(data)) # Ensure it's always stored as a string
self.set_name(f"Data Node: {data}")
def transmit_data(self, data):
# Transmit data further if there's an output connection
output_port = self.output(0)
if output_port and output_port.connected_ports():
for connected_port in output_port.connected_ports():
connected_node = connected_port.node()
if hasattr(connected_node, 'receive_data'):
connected_node.receive_data(data)
connected_node.receive_data(data, source_port_name)

View File

@ -4,6 +4,9 @@ Collector Process:
- Runs the OCR engine.
- Updates OCR data every 0.5 seconds.
- Exposes the latest data via an HTTP API using Flask.
This version splits the HP, MP, and FP values into 'current' and 'total' before
sending them via the API, so the Character Status Node can ingest them directly.
"""
import time
@ -14,10 +17,13 @@ app = Flask(__name__)
# Global variable to hold the latest stats (HP, MP, FP, EXP)
latest_data = {
"hp": "0/0",
"mp": "0/0",
"fp": "0/0",
"exp": "0.0000"
"hp_current": 0,
"hp_total": 0,
"mp_current": 0,
"mp_total": 0,
"fp_current": 0,
"fp_total": 0,
"exp": 0.0000
}
def ocr_collector():
@ -30,18 +36,21 @@ def ocr_collector():
while True:
# Simulate updating stats:
hp_current = 50 + counter % 10
hp_max = 100
hp_total = 100
mp_current = 30 + counter % 5
mp_max = 50
mp_total = 50
fp_current = 20 # fixed, for example
fp_max = 20
fp_total = 20
exp_val = round(10.0 + (counter * 0.1), 4)
latest_data = {
"hp": f"{hp_current}/{hp_max}",
"mp": f"{mp_current}/{mp_max}",
"fp": f"{fp_current}/{fp_max}",
"exp": f"{exp_val:.4f}"
"hp_current": hp_current,
"hp_total": hp_total,
"mp_current": mp_current,
"mp_total": mp_total,
"fp_current": fp_current,
"fp_total": fp_total,
"exp": exp_val
}
counter += 1