from flask import Flask, jsonify from flowpipe.node import Node from flowpipe.graph import Graph from flowpipe.plug import InputPlug, OutputPlug app = Flask(__name__) # =========================== # Define Custom Nodes # =========================== class MultiplyNode(Node): """Multiplies an input value by a factor""" factor = InputPlug() value = InputPlug() result = OutputPlug() def compute(self): self.result.value = self.value.value * self.factor.value class AddNode(Node): """Adds two input values""" input1 = InputPlug() input2 = InputPlug() sum = OutputPlug() def compute(self): self.sum.value = self.input1.value + self.input2.value class OutputNode(Node): """Outputs the final result""" input_value = InputPlug() output_value = OutputPlug() def compute(self): self.output_value.value = self.input_value.value # =========================== # Define Graph Workflow # =========================== def create_workflow(): """Creates a sample workflow using nodes""" graph = Graph(name="Sample Workflow") # Create nodes multiply = MultiplyNode(name="Multiplier", graph=graph) add = AddNode(name="Adder", graph=graph) output = OutputNode(name="Output", graph=graph) # Connect nodes multiply.result.connect(add.input1) # Multiply output -> Add input1 add.sum.connect(output.input_value) # Add output -> Output node # Set static input values multiply.factor.value = 2 multiply.value.value = 5 # 5 * 2 = 10 add.input2.value = 3 # 10 + 3 = 13 return graph @app.route('/run-workflow', methods=['GET']) def run_workflow(): """Runs the defined node-based workflow""" graph = create_workflow() graph.evaluate() # Execute the graph # Extract the final result from the output node output_node = graph.nodes["Output"] result = output_node.output_value.value return jsonify({"workflow_result": result}) if __name__ == '__main__': app.run(debug=True)