81 lines
2.0 KiB
Python

from flask import Flask, jsonify
from flowpipe.node import Node
from flowpipe.graph import Graph
from flowpipe.plug import InputPlug, OutputPlug
app = Flask(__name__)
# ===========================
# Define Custom Nodes
# ===========================
class MultiplyNode(Node):
"""Multiplies an input value by a factor"""
factor = InputPlug()
value = InputPlug()
result = OutputPlug()
def compute(self):
self.result.value = self.value.value * self.factor.value
class AddNode(Node):
"""Adds two input values"""
input1 = InputPlug()
input2 = InputPlug()
sum = OutputPlug()
def compute(self):
self.sum.value = self.input1.value + self.input2.value
class OutputNode(Node):
"""Outputs the final result"""
input_value = InputPlug()
output_value = OutputPlug()
def compute(self):
self.output_value.value = self.input_value.value
# ===========================
# Define Graph Workflow
# ===========================
def create_workflow():
"""Creates a sample workflow using nodes"""
graph = Graph(name="Sample Workflow")
# Create nodes
multiply = MultiplyNode(name="Multiplier", graph=graph)
add = AddNode(name="Adder", graph=graph)
output = OutputNode(name="Output", graph=graph)
# Connect nodes
multiply.result.connect(add.input1) # Multiply output -> Add input1
add.sum.connect(output.input_value) # Add output -> Output node
# Set static input values
multiply.factor.value = 2
multiply.value.value = 5 # 5 * 2 = 10
add.input2.value = 3 # 10 + 3 = 13
return graph
@app.route('/run-workflow', methods=['GET'])
def run_workflow():
"""Runs the defined node-based workflow"""
graph = create_workflow()
graph.evaluate() # Execute the graph
# Extract the final result from the output node
output_node = graph.nodes["Output"]
result = output_node.output_value.value
return jsonify({"workflow_result": result})
if __name__ == '__main__':
app.run(debug=True)