import { Edge } from '@xyflow/react';

import { CustomPythonNodeDefinition } from './customNodes.types';
import { AS_DATA_TYPES } from './data.types';
import {
  getLoggingHelperFunction,
  getSaveResultsHelperFunction,
  getUpdateStateHelperFunction,
} from './helperFunctions';
import {
  ConditionalNode,
  HANDLE_ID_CONDITION,
  HANDLE_ID_FALSE,
  HANDLE_ID_TRUE,
  isConditionalNode,
} from './nodes/ConditionalNode';
import {
  generatePythonFunctionNodeCode,
  isCustomPythonNode,
} from './nodes/CustomPythonNode';
import { GatewayNode, isSubflowGatewayNode } from './nodes/GatewayNode';
import { generateMapCode, isMapNode } from './nodes/MapNode';
import { generateSubflowCode, isSubflowNode } from './nodes/SubflowNode';
import { AS_NODE_STATUS, AsNodes, AsNodesWithGateway } from './nodes/types';
import { FlowData } from './types';
import { basename, flowResultsDir, flowStateFileName } from './utils';

/**
 * Context needed for naming of variables
 * Maps from node/subflow id to index
 */
export type CodeGenContext = {
  nodeIndices: Record<string, number>;
  subflowIndices: Record<string, number>;
};

export const indentStr = '    ';

/**
 * Indents each line of a string by a specified number of spaces.
 * @param text The text to indent
 * @param size The number of spaces to indent each line
 * @returns The indented string
 */
function indent(text: string, size: number): string {
  const indentStr = ' '.repeat(size);
  return text
    .split('\n')
    .map((line) => (line ? indentStr + line : line))
    .join('\n');
}

function assert(condition: unknown, msg?: string): asserts condition {
  if (!condition) {
    throw new Error(msg);
  }
}

/**
 * Deterministic way to label variables. The index is an important part and determined by the order when a node was added.
 * If for example the second of 3 nodes is deleted, the 3rd node will take its place and index. So all variables after
 * that are no in sync i.e. they would be different if rerun
 * @param context
 * @param node
 * @param handleId
 */
export function varname(
  context: CodeGenContext,
  node: AsNodesWithGateway,
  handleId?: string
): string {
  // special handling for gateway nodes, these are always parameters for subflows
  if (isSubflowGatewayNode(node)) {
    const index = node.data.connections.outputs.findIndex(
      (param) => param.id === handleId
    );
    const parameter = node.data.connections.outputs[index];
    if (parameter.label) return parameter.label;
    return parameter.label ?? `param_${index}`;
  }

  const cleanedNodeId = node.id.replaceAll('-', '_');

  if (!handleId) {
    return `val_${cleanedNodeId}`;
  }

  const handleIndex = node.data.connections.outputs.findIndex(
    (handle) => handle.id === handleId
  );
  const multipleHandles = node.data.connections.outputs.length > 1;

  // if there are multiple output handles we add its index
  return `val_${cleanedNodeId}${multipleHandles ? `_${handleIndex}` : ''}`;
}

/**
 * Generate the function name of the subflow. To be used while defining the body and while executing it
 * @param context
 * @param subflow
 */
export function subflowFn(
  context?: CodeGenContext,
  subflow?: FlowData
): string {
  if (!subflow) return '???';

  if (subflow.name) {
    return `fn_${subflow.name}`;
  }

  if (context?.subflowIndices[subflow.id] != null) {
    return `fn_subflow_${context.subflowIndices[subflow.id]}`;
  }

  return `fn_subflow`;
}

export function nodeFn(nodeType: string): string {
  const cleanedNodeType = nodeType.replaceAll('-', '_');
  return `fn_${cleanedNodeType}`;
}

/**
 * Calculate the varname again for the incoming node. Because the varnames are deterministic, the name will
 * be the same here for consuming nodes as where the variable was used for executing the producing node.
 *
 * varname = producing_node()
 *
 * def consuming_node(placeholder):
 *    ...
 *
 * ... = consuming_node(varname)
 *
 *
 * @param context
 * @param nodes
 * @param edges
 * @param node
 * @param handleId
 */
export function getIncomingVarNameByHandleId(
  context: CodeGenContext,
  { nodes, edges }: Pick<FlowData, 'nodes' | 'edges'>,
  node: AsNodesWithGateway,
  handleId: string
): string {
  const parameter = node.data.connections.inputs.find(
    (input) => input.id === handleId
  );

  if (parameter.isConfigParameter && !parameter.isDynamic) {
    switch (parameter.typeSchema.type) {
      case AS_DATA_TYPES.STRING: {
        return `"${String(parameter.value)}"`;
      }
      case AS_DATA_TYPES.NUMBER: {
        return String(Number(parameter.value));
      }
      case AS_DATA_TYPES.BOOL: {
        return parameter.value ? 'True' : 'False';
      }
      case AS_DATA_TYPES.LIST: {
        switch (parameter.typeSchema.items.type) {
          case AS_DATA_TYPES.STRING: {
            return `[${(parameter.value as string[])
              .map((item) => `"${String(item)}"`)
              .join(', ')}]`;
          }
          case AS_DATA_TYPES.NUMBER: {
            return `[${(parameter.value as number[])
              .map((item) => String(Number(item)))
              .join(', ')}]`;
          }
          case AS_DATA_TYPES.BOOL: {
            return `[${(parameter.value as boolean[])
              .map((item) => (item ? 'True' : 'False'))
              .join(', ')}]`;
          }
          default: {
            throw Error(
              `Unsupported type in config parameter: ${JSON.stringify(
                parameter.typeSchema.items
              )}`
            );
          }
        }
      }
      default: {
        throw Error(
          `Unsupported type in config parameter: ${JSON.stringify(
            parameter.typeSchema
          )}`
        );
      }
    }
  } else {
    const ie = edges.find(
      (edge) => edge.target === node.id && edge.targetHandle === handleId
    );

    if (!ie) return 'None';

    const sourceNode = nodes.find((node) => node.id === ie.source);
    return varname(context, sourceNode, ie.sourceHandle);
  }
}

/**
 * Topologically sort the graph into a simple array of nodes.
 * This gives us the order in which nodes can be executed
 * It also requires that the graph is a DAG, which is why we don't allow cycle connections
 * @param nodes
 * @param edges
 */
function topsort(
  nodes: AsNodesWithGateway[],
  edges: Edge[]
): AsNodesWithGateway[] {
  const incomingEdges: Record<string, number> = {};
  nodes.forEach((node) => {
    incomingEdges[node.id] = 0;
  });

  edges.forEach((edge) => {
    if (incomingEdges[edge.target] !== undefined) {
      incomingEdges[edge.target]++;
    }
  });

  const queue: AsNodesWithGateway[] = [];
  nodes.forEach((node) => {
    if (incomingEdges[node.id] === 0) {
      queue.push(node);
    }
  });

  const topologicalOrder: AsNodesWithGateway[] = [];

  while (queue.length > 0) {
    const currentNode = queue.shift();
    assert(currentNode !== undefined, 'currentNode undefined');
    topologicalOrder.push(currentNode);

    const outgoingEdges = edges.filter(
      (edge) => edge.source === currentNode.id
    );
    outgoingEdges.forEach((edge) => {
      incomingEdges[edge.target]--;
      if (incomingEdges[edge.target] === 0) {
        const targetNode = nodes.find((node) => node.id === edge.target);
        assert(targetNode !== undefined, 'targetNode undefined');
        queue.push(targetNode);
      }
    });
  }
  assert(topologicalOrder.length === nodes.length, 'cycle??');

  return topologicalOrder;
}

/**
 * Identifies nodes in a directed graph that exclusively lead to a specific condition path
 * by performing a backwards traversal from a conditional node.
 *
 * @param nodes - Array of nodes in the graph, each must have a unique 'id' property
 * @param edges - Array of directed edges, each with 'source' and 'target' properties
 * @param conditionalNode - ID of the gateway/conditional node to start traversal from
 * @param condition - Specifies which path to follow (HANDLE_ID_TRUE or HANDLE_ID_FALSE)
 *
 * @returns Array of node IDs that exclusively lead to the specified condition path
 *
 * @algorithm
 * 1. Creates a map of incoming edges for each node
 * 2. Performs depth-first search backwards from conditional node
 * 3. For each unvisited node encountered:
 *    - Checks if all its outgoing edges lead to the current path
 *    - Adds node to result if it has no connections to other paths
 */
function propagateConditionBackwards(
  nodes: AsNodesWithGateway[],
  edges: Edge[],
  conditionalNode: string,
  condition: typeof HANDLE_ID_TRUE | typeof HANDLE_ID_FALSE
) {
  const incomingEdges: Record<string, string[]> = {};
  nodes.forEach((node) => {
    incomingEdges[node.id] = [];
  });

  edges.forEach((edge) => {
    if (incomingEdges[edge.target] !== undefined) {
      incomingEdges[edge.target].push(edge.source);
    }
  });

  const visited = new Set();
  const maximalSet: string[] = [];

  /**
   * Performs a depth-first search traversal backwards from a given node,
   * collecting nodes that exclusively lead to a specific condition path.
   *
   * @param nodeId - The ID of the current node being processed
   *
   * @details
   * The function traverses the graph backwards, starting from the conditional node,
   * and adds nodes to the maximalSet if they meet these criteria:
   * 1. The node hasn't been visited before
   * 2. The node's outgoing edges either:
   *    - Lead exclusively to the current path
   *    - Connect to the conditional node with the specified condition
   *
   * @mutates
   * - visited Set: Marks nodes as processed
   * - maximalSet Array: Collects qualifying nodes
   */
  function dfs(nodeId: string) {
    visited.add(nodeId);
    maximalSet.push(nodeId);

    const incomingNodes = incomingEdges[nodeId];
    incomingNodes.forEach((incomingNodeId) => {
      if (!visited.has(incomingNodeId)) {
        const outgoingEdges = edges.filter(
          (edge) => edge.source === incomingNodeId
        );
        const isConnectedToOtherNodes = outgoingEdges.some(
          (edge) =>
            (!visited.has(edge.target) && edge.target !== nodeId) ||
            (edge.target === conditionalNode && edge.targetHandle !== condition)
        );

        if (!isConnectedToOtherNodes) {
          dfs(incomingNodeId);
        }
      }
    });
  }

  dfs(conditionalNode);

  return maximalSet;
}

/**
 * Generate the final line of a subflow definition
 * e.g. 'return (val_a, val_b)
 */
function generateSubflowReturn(
  context: CodeGenContext,
  flow: FlowData,
  node: GatewayNode
) {
  assert(
    node.data.gatewayType === 'out',
    'Cannot generate return statement for gateway with type in.'
  );

  const gateways = node.data.connections.inputs;
  if (gateways.length === 0) {
    return 'pass';
  } else if (gateways.length === 1) {
    return `return ${getIncomingVarNameByHandleId(
      context,
      flow,
      node,
      gateways[0].id
    )}`;
  } else {
    const varNames = gateways.map((gateway) =>
      getIncomingVarNameByHandleId(context, flow, node, gateway.id)
    );
    return `return (${varNames.join(', ')})`;
  }
}

/**
 * Generate the subflow function header
 * e.g. 'def fn_a(param_0, param_1):
 */
function generateSubflowDefinition(
  subflow: FlowData,
  node: GatewayNode,
  context?: CodeGenContext
) {
  assert(
    node.data.gatewayType === 'in',
    'Cannot generate definition statement for gateway with type out.'
  );

  const params = node.data.connections.outputs.map((parameter, i) => {
    if (parameter.label) return parameter.label;
    return parameter.label ?? `param_${i}`;
  });
  return `def ${subflowFn(context, subflow)}(${params.join(', ')}):`;
}

/**
 * e.g. 'val_0, val_1 = fn()'
 * @returns a string like val_0, val_1 or null if there are none
 */
export function generateReturnValues(
  context: CodeGenContext,
  node: AsNodes
): string {
  const outputs = node.data.connections.outputs;
  const vars = outputs.map((output) => varname(context, node, output.id));

  if (outputs.length === 0) {
    return null;
  } else {
    return `${vars.join(', ')}`;
  }
}

/**
 * Aggregate the statements for a given conditional node branch.
 *
 * Isn't this a repeat for propagating the condition backwards? Can we reuse the previous solution?
 */
function aggregateConditionalBlock(
  nodes: AsNodesWithGateway[],
  edges: Edge[],
  statements: [AsNodes, string[]][],
  node: ConditionalNode,
  conditional: typeof HANDLE_ID_TRUE | typeof HANDLE_ID_FALSE
): [AsNodes[], string[]] {
  const propagated = propagateConditionBackwards(
    nodes,
    edges,
    node.id,
    conditional
  );
  const propStatements = statements.filter(([n]) =>
    propagated.slice(1).includes(n.id)
  );
  return [
    propStatements.map(([node, _]) => node),
    propStatements.flatMap(([, ss]) => ss),
  ];
}

/**
 * Generate the code for conditional nodes, by re-order the generated script from the topologically sorted DAG
 * (of course without violating those constraints either)
 * For each conditional statement find out the statements from the branches that are only needed for this condition.
 * Then aggregate them into either the if- or else-branch.
 *
 * Recursive to handle each conditional node with the now modified list of statements
 * Exit if there are no more unvisited conditionals.
 * Otherwise, pick the first conditional in the list of statements and handle that.
 * This works because a visited conditional for which the code was generated can now be seen as a (grouped) statement, that
 * can also be aggregated into another if/else
 *
 * Before:
 * val x = "foo"
 * if a:
 *   y = x
 * else:
 *   y = z
 *
 * After:
 * if a:
 *   val x = "foo"
 *   y = x
 * else:
 *   y = z
 *
 * A node statement can't be aggregated into the if/else block if it is also required for something else
 * val x = "foo"
 * if a:
 *   y = x
 * else:
 *   y = z
 * f = x
 *
 * Does this need to happen with the already generated code? Makes it harder to introspect certain things in the code for example.
 * And when we generate code, can we just keep one string per statement instead of an array?
 *
 * @param context
 * @param nodes all
 * @param edges all
 * @param statements ordered/topologically sorted list of nodes and their generated code except for the if/else nodes
 * @param visited
 */
function optimizeConditionals(
  context: CodeGenContext,
  nodes: AsNodes[],
  edges: Edge[],
  statements: [AsNodes, string[]][],
  visited: string[] = []
): string[] {
  // collect the node, statements, and index of every conditional node
  const conditionals = statements
    .map(([node, statement], i) => [node, statement, i] as const)
    .filter((statement): statement is [ConditionalNode, string[], number] =>
      isConditionalNode(statement[0])
    )
    .filter(([node, ,]) => !visited.includes(node.id));
  // termination condition: no more conditionals left to process
  // flatten the bundled statements and return the newly ordered list of statements
  if (conditionals.length <= 0)
    return statements.flatMap(([, statement]) => statement);
  // take the next conditional according to the topological sort
  const [node, , i] = conditionals[0];
  // get all nodes that exclusively lead to a conditional branch of this node
  // slice(1) is to remove the current conditional node from the lists
  const propagated = [
    ...propagateConditionBackwards(nodes, edges, node.id, HANDLE_ID_TRUE).slice(
      1
    ),
    ...propagateConditionBackwards(
      nodes,
      edges,
      node.id,
      HANDLE_ID_FALSE
    ).slice(1),
  ];
  // collect statements before and after the conditional block
  const beforeConditional = statements
    .slice(0, i)
    .filter(([n]) => !propagated.includes(n.id));
  const afterConditional = statements
    .slice(i + 1)
    .filter(([n]) => !propagated.includes(n.id));
  // build statements for the true and false block of the conditional node
  const [aggregateTrueBlockNodes, aggregateTrueBlockStatments] =
    aggregateConditionalBlock(nodes, edges, statements, node, HANDLE_ID_TRUE);
  const [aggregateFalseBlockNodes, aggregateFalseBlockStatements] =
    aggregateConditionalBlock(nodes, edges, statements, node, HANDLE_ID_FALSE);

  // get variable names
  const flow = { nodes, edges };
  const conditionVar = getIncomingVarNameByHandleId(
    context,
    flow,
    node,
    HANDLE_ID_CONDITION
  );
  const trueVar = getIncomingVarNameByHandleId(
    context,
    flow,
    node,
    HANDLE_ID_TRUE
  );
  const falseVar = getIncomingVarNameByHandleId(
    context,
    flow,
    node,
    HANDLE_ID_FALSE
  );

  // Find all the edges (= variables) that go into the true block and don't originate in the true block themselves
  const trueBlockIngoingParams = edges
    .filter(
      (edge) =>
        Boolean(
          aggregateTrueBlockNodes.find((node) => edge.target === node.id)
        ) && !aggregateTrueBlockNodes.find((node) => edge.source === node.id)
    )
    .map((edge) =>
      getIncomingVarNameByHandleId(
        context,
        flow,
        nodes.find((node) => node.id === edge.target),
        edge.targetHandle
      )
    );
  const entryAllowedTrue =
    trueBlockIngoingParams.length > 0
      ? `and all([param is not None for param in [${trueBlockIngoingParams.join(
          ','
        )}]])`
      : '';

  const falseBlockIngoingParams = edges
    .filter(
      (edge) =>
        Boolean(
          aggregateFalseBlockNodes.find((node) => edge.target === node.id)
        ) && !aggregateFalseBlockNodes.find((node) => edge.source === node.id)
    )
    .map((edge) =>
      getIncomingVarNameByHandleId(
        context,
        flow,
        nodes.find((node) => node.id === edge.target),
        edge.targetHandle
      )
    );
  const entryAllowedFalse =
    falseBlockIngoingParams.length > 0
      ? `and all([param is not None for param in [${falseBlockIngoingParams.join(
          ','
        )}]])`
      : '';

  // put together all statements for the current conditional node
  const outputVar = generateReturnValues(context, node);
  const outputIdsStr = node.data.connections.outputs
    .map((output) => `'${output.id}'`)
    .join(', ');
  console.assert(
    outputVar !== null,
    'if/else outputVar can not be null, since the if/else node always has one output'
  );
  const statement = `
${outputVar} = None
try:
    if ${conditionVar} ${entryAllowedTrue}:
        update_state('${node.id}', '${AS_NODE_STATUS.RUNNING}')
${indent(aggregateTrueBlockStatments.join('\n'), 8)}
        if ${trueVar} is None:
            raise Exception("Some return values were none") 
        update_state('${node.id}', '${AS_NODE_STATUS.SUCCESS}')
        ${outputVar} = ${trueVar}
    elif not ${conditionVar} ${entryAllowedFalse}:
        update_state('${node.id}', '${AS_NODE_STATUS.RUNNING}')
${indent(aggregateFalseBlockStatements.join('\n'), 8)}
        if ${falseVar} is None:
            raise Exception("Some return values were none") 
        update_state('${node.id}', '${AS_NODE_STATUS.SUCCESS}')
        ${outputVar} = ${falseVar}
    save_var_to_results('${node.id}', zip([${outputIdsStr}], [${outputVar}]))
except Exception:
    update_state('${node.id}', '${AS_NODE_STATUS.ERROR}')
    save_stacktrace_to_results('${node.id}', traceback.format_exc())
`
    .split('\n')
    .filter((line) => line.length > 0);

  // After an if/else node has aggregated nodes into true or false, these edges have to be changed to point to the if/else node instead
  // The nodes that have been aggregated are used only inside the if/else, so we can be sure this is correct.
  // This grouping also means that they can be removed from the graph, since they no longer play a role.
  const edgesWithoutAbsorbedInputs = edges
    // Filter out all edges that go out from the aggregated nodes, they are no longer needed, because they are in the aggregated black box
    .filter(
      (edge) =>
        ![...aggregateTrueBlockNodes, ...aggregateFalseBlockNodes].find(
          (node) => edge.source === node.id
        )
    )
    // Switch the remaining ingoing edges to the if true or false handle
    // Since we filtered outgoing edges from inside the block, any ingoing that are still remaining must come from outside the block
    .map((edge) => {
      if (aggregateTrueBlockNodes.find((node) => edge.target === node.id)) {
        return { ...edge, target: node.id, targetHandle: HANDLE_ID_TRUE };
      } else if (
        aggregateFalseBlockNodes.find((node) => edge.target === node.id)
      ) {
        return { ...edge, target: node.id, targetHandle: HANDLE_ID_FALSE };
      } else {
        return edge;
      }
    });
  // Only keep the nodes not in an aggregated block
  const nodesWithoutAbsorbedInputs = nodes.filter(
    (node) =>
      ![...aggregateTrueBlockNodes, ...aggregateFalseBlockNodes].find(
        (aggregatedNode) => node.id == aggregatedNode.id
      )
  );
  // continue optimization for remaining conditional nodes
  // the statements of this conditional node are bundled for this purpose
  return optimizeConditionals(
    context,
    nodesWithoutAbsorbedInputs,
    edgesWithoutAbsorbedInputs,
    [...beforeConditional, [node, statement], ...afterConditional],
    [...visited, node.id]
  );
}

/**
 * Reduces a flow to include only the specified node and its predecessors
 * @param flowToReduce Flow to be filtered
 * @param nodeId ID of node to keep with predecessors
 * @returns Reduced flow with only necessary nodes and edges
 */
function reduceFlowToNodeAndPredecessors(
  flowToReduce: FlowData,
  nodeId: string
): FlowData {
  const nodesToKeep = new Set<string>();
  const visited = new Set<string>();
  const stack = [nodeId];

  // Traverse backwards from the target node
  while (stack.length > 0) {
    const currentId = stack.pop();
    if (visited.has(currentId)) continue;
    visited.add(currentId);

    // Add this node to the keep list
    nodesToKeep.add(currentId);

    // Find all predecessor nodes
    flowToReduce.edges
      .filter((edge) => edge.target === currentId)
      .forEach((edge) => {
        const sourceNode = flowToReduce.nodes.find(
          (node) => node.id === edge.source
        );
        const sourceNodeState = sourceNode?.data?.status;

        if (sourceNodeState === 'success') {
          // For successful predecessors, add them but don't traverse further
          nodesToKeep.add(edge.source);
        } else {
          // For other nodes, add them and continue traversal
          nodesToKeep.add(edge.source);
          stack.push(edge.source);
        }
      });
  }

  // Create filtered nodes list - keep node type for all except successful ones
  const filteredNodes = flowToReduce.nodes
    .filter((node) => nodesToKeep.has(node.id))
    .map((node) => {
      const nodeState = node.data?.status;
      // Only keep successful nodes as reference (type=undefined)
      if (nodeState === 'success' && node.id !== nodeId) {
        return { ...node, type: undefined };
      }
      return node;
    });

  // Create filtered edges list
  const filteredEdges = flowToReduce.edges.filter(
    (edge) => nodesToKeep.has(edge.source) && nodesToKeep.has(edge.target)
  );

  // Return the reduced flow
  return {
    ...flowToReduce,
    nodes: filteredNodes,
    edges: filteredEdges,
  };
}

/**
 * Searches for a node in the given flow and reduces the flow if found
 * @param currentFlow Flow to search in
 * @param targetId ID of the target node
 * @param isRoot Whether this is the root/top-level flow
 * @returns Object containing reduced flow and found status
 */
function searchAndReduceFlow(
  currentFlow: FlowData,
  targetId: string,
  isRoot = true
): { reducedFlow: FlowData; found: boolean } {
  // Check if target is directly in this flow level
  const targetInThisFlow = currentFlow.nodes.some(
    (node) => node.id === targetId
  );

  if (targetInThisFlow) {
    // Reduce this flow level to include only the target and its predecessors
    const reducedFlow = reduceFlowToNodeAndPredecessors(currentFlow, targetId);
    return { reducedFlow, found: true };
  }

  // If not, check each subflow
  if (currentFlow.subflows && currentFlow.subflows.length > 0) {
    for (const subflow of currentFlow.subflows) {
      const { reducedFlow: reducedSubflow, found } = searchAndReduceFlow(
        subflow,
        targetId,
        false
      );

      // If target was found in this subflow
      if (found) {
        // If this is the root flow, we need to include the chain to the subflow
        if (isRoot) {
          // Find the subflow node and its predecessors in the current flow
          const subflowNode = currentFlow.nodes.find(
            (node) => node.id === subflow.parentNodeId
          );

          if (subflowNode) {
            // Reduce the current flow to just what's needed for the subflow node
            const reducedMainFlow = reduceFlowToNodeAndPredecessors(
              currentFlow,
              subflowNode.id
            );

            // Filter out the existing subflow with matching parentNodeId,
            // then add the reduced version of that subflow
            const reducedSubflows = [
              ...(reducedMainFlow.subflows || []).filter(
                (sf) => sf.parentNodeId !== subflow.parentNodeId
              ),
              reducedSubflow,
            ];

            // Keep the found subflow in the reduced main flow
            return {
              reducedFlow: {
                ...reducedMainFlow,
                subflows: reducedSubflows,
              },
              found: true,
            };
          }
        }

        // For non-root levels, we also need to preserve all subflows at this level while replacing
        // only the specific subflow containing our target node with its reduced version
        const updatedSubflows = [
          ...(currentFlow.subflows || []).filter(
            (sf) => sf.parentNodeId !== subflow.parentNodeId
          ),
          reducedSubflow,
        ];

        return {
          reducedFlow: {
            ...currentFlow,
            subflows: updatedSubflows,
            // If not root, keep the full nodes/edges for parent chain tracking
            nodes: isRoot ? [] : currentFlow.nodes,
            edges: isRoot ? [] : currentFlow.edges,
          },
          found: true,
        };
      }
    }
  }

  // If we get here and this is the root, the node wasn't found
  if (isRoot) {
    return {
      reducedFlow: { ...currentFlow, nodes: [], edges: [], subflows: [] },
      found: false,
    };
  }

  // Return unchanged flow for non-root levels when target not found
  return { reducedFlow: currentFlow, found: false };
}

/**
 * Determines the minimal flow needed to execute a specific target node.
 *
 * @param originalFlow The complete flow data with nodes, edges, and subflows
 * @param targetNodeId ID of the node to be executed
 * @returns A reduced version of the flow with only necessary nodes for execution
 */
export function determineHierarchicalFlowForExecution(
  originalFlow: FlowData,
  targetNodeId: string
): FlowData {
  const flow = { ...originalFlow };

  // Search for the target and get the reduced flow
  const { reducedFlow, found } = searchAndReduceFlow(flow, targetNodeId);

  // If the target node wasn't found, log an error and fall back to complete flow execution
  if (!found) {
    console.error(
      `Target node ${targetNodeId} not found in flow hierarchy. Falling back to complete flow execution.`
    );

    // Return the original flow to execute everything
    return originalFlow;
  }

  return reducedFlow;
}

/**
 * Identifies all nodes that need to be reset given a starting node.
 * This includes the node itself, all its successors, and for subflow nodes,
 * all nodes within the subflow.
 *
 * Note: This function only determines the set of nodes to reset but does not
 * perform the actual reset operation.
 *
 * @param flow
 * @param nodeId
 * @param resetNodesInsideSubflow
 * @returns A Set of node IDs that need to be reset
 */
function resetNodeAndSuccessors(
  flow: FlowData,
  nodeId: string,
  resetNodesInsideSubflow?: boolean
): Set<string> {
  const result = new Set<string>([nodeId]);
  const visited = new Set<string>();
  const stack = [nodeId];

  while (stack.length > 0) {
    const currentNodeId = stack.pop();
    if (visited.has(currentNodeId)) continue;
    visited.add(currentNodeId);

    const currentNode = flow.nodes.find((node) => node.id === currentNodeId);

    // If node is a subflow, find and reset all nodes in that subflow
    if (
      resetNodesInsideSubflow &&
      (isSubflowNode(currentNode) || isMapNode(currentNode))
    ) {
      // Find matching subflow by parentNodeId
      const subflow = flow.subflows?.find(
        (sf) => sf.parentNodeId === currentNodeId
      );

      if (subflow) {
        // Reset all nodes in the subflow
        const subflowNodesToReset = determineNodesToReset(subflow);

        // Add all subflow nodes to the reset set
        subflowNodesToReset.forEach((nodeId) => result.add(nodeId));
      }
    }

    // Find successors - nodes that have the current node as source
    const successors = flow.edges
      .filter((edge) => edge.source === currentNodeId)
      .map((edge) => edge.target);

    // Add all successors to results and stack
    for (const successorId of successors) {
      result.add(successorId);
      stack.push(successorId);
    }
  }

  return result;
}

/**
 * Determines all nodes that need to be reset, including handling nodes in subflows
 * and their parent flow successors.
 * @param flow The flow data
 * @param targetNodeId Optional ID of node to reset (with successors)
 * @param parentInfo Optional parent flow tracking data
 * @returns Set of all node IDs to reset
 */
function determineNodesToReset(
  flow: FlowData,
  targetNodeId?: string,
  parentInfo?: {
    parentFlow: FlowData;
    subflowNodeId: string;
    ancestorChain: Array<{ flow: FlowData; nodeId: string }>;
  }
): Set<string> {
  const nodesToReset = new Set<string>();

  // Case 1: No targetNodeId - reset the entire flow
  if (!targetNodeId) {
    flow.nodes.forEach((node) => {
      nodesToReset.add(node.id);

      // Reset subflows recursively
      if (node.type === 'subflow') {
        const subflow = flow.subflows?.find(
          (sf) => sf.parentNodeId === node.id
        );
        if (subflow) {
          const subflowNodesToReset = determineNodesToReset(subflow);
          subflowNodesToReset.forEach((id) => nodesToReset.add(id));
        }
      }
    });

    return nodesToReset;
  }

  // Case 2: targetNodeId exists - find it in current flow or subflows

  // Check if target is in this flow level
  const targetNode = flow.nodes.find((node) => node.id === targetNodeId);

  if (targetNode) {
    // Target found at this level - reset it and all its successors
    return resetNodeAndSuccessors(flow, targetNodeId, true);
  } else if (flow.subflows) {
    // Target might be in a subflow - search through each subflow
    for (const subflow of flow.subflows) {
      // Create/extend the ancestor chain
      const ancestorChain = parentInfo
        ? [...parentInfo.ancestorChain, { flow, nodeId: subflow.parentNodeId }]
        : [{ flow, nodeId: subflow.parentNodeId }];

      // Recursively search this subflow
      const subflowReset = determineNodesToReset(subflow, targetNodeId, {
        parentFlow: flow,
        subflowNodeId: subflow.parentNodeId,
        ancestorChain,
      });

      // If target was found in this subflow (non-empty result)
      if (subflowReset.size > 0) {
        // Add all nodes from the subflow to reset
        subflowReset.forEach((id) => nodesToReset.add(id));

        // Also reset the subflow node and its successors in the parent flow
        const parentResets = resetNodeAndSuccessors(flow, subflow.parentNodeId);
        parentResets.forEach((id) => nodesToReset.add(id));

        // Also travel up the entire ancestor chain if needed
        if (parentInfo?.ancestorChain.length > 0) {
          parentInfo.ancestorChain.forEach((ancestor) => {
            const ancestorResets = resetNodeAndSuccessors(
              ancestor.flow,
              ancestor.nodeId
            );
            ancestorResets.forEach((id) => nodesToReset.add(id));
          });
        }

        return nodesToReset;
      }
    }
  }

  return nodesToReset;
}

/**
 * Produces a script for resetting a target node and all its successor nodes
 * @param flow
 * @param flowPath e.g. flowdesigner.asr/2.asflow
 * @param targetNodeId
 */
export function resetNodesCodegen(
  flow: FlowData,
  flowPath: string,
  targetNodeId: string
): string {
  const localResultsDir = basename(flowResultsDir(flowPath));
  const nodesToResetIds = determineNodesToReset(flow, targetNodeId);

  const imports = `
import os
`;

  const resetStatements = Array.from(nodesToResetIds)
    .map((nodeId) => `update_state('${nodeId}', '${AS_NODE_STATUS.WAITING}')`)
    .join('\n');

  return imports + resetStatements;
}

/**
 * Take a flow of Nodes and Edges and produce a script as an array of lines.
 * If a targetNodeld is provided, the flow will only be executed up until the target node.
 * @param flow
 * @param nodeDefinitions
 * @param flowPath e.g. flowdesigner.asr/2.asflow
 * @param oldContext
 * @param targetNodeId
 */
export function codegen(
  flow: FlowData,
  nodeDefinitions: CustomPythonNodeDefinition[],
  flowPath: string,
  oldContext?: CodeGenContext,
  targetNodeId?: string
): string {
  if (targetNodeId) {
    flow = determineHierarchicalFlowForExecution(flow, targetNodeId);
  }

  const { nodes, edges, subflows } = flow;
  const localResultsDir = basename(flowResultsDir(flowPath));
  const localStatePath = localResultsDir + '/' + flowStateFileName;

  const nodeIndices = Object.fromEntries(nodes.map((node, i) => [node.id, i]));
  const subflowIndices = Object.fromEntries(
    subflows.map((subflow, i) => [subflow.id, i])
  );
  const context: CodeGenContext = {
    nodeIndices,
    subflowIndices,
  };

  // we define all subflows as separate functions at the top of each function definition
  const subflowStatements = subflows.flatMap((subflow) => {
    return codegen(subflow, nodeDefinitions, flowPath, context);
  });

  // we also define separate functions for all custom python nodes that are used in this subflow
  const usedCustomPythonNodeTypes = [
    ...new Set(nodes.filter(isCustomPythonNode).map((node) => node.data.type)),
  ];
  const customPythonFunctionStatements = usedCustomPythonNodeTypes.flatMap(
    (nodeType) => {
      const template = nodeDefinitions.find(
        (nd) => nd.type === nodeType
      ).functionTemplate;
      // replace function name, split at line break
      const statements = template
        .replace('_my_function', nodeFn(nodeType))
        .split('\n');
      return [...statements, ''];
    }
  );

  const orderedNodes = topsort(nodes, edges);
  // remove gateway nodes because they are 'pseudo-nodes' that don't generate a statement
  const orderedNodesWithoutGateways = orderedNodes.filter(
    (node): node is AsNodes =>
      !isSubflowGatewayNode(node) && node.type !== undefined
  );
  // List of nodes and the corresponding generated code of that node
  const orderedStatementLists: [AsNodes, string[]][] =
    orderedNodesWithoutGateways.map((node) => {
      switch (node.type) {
        case 'python_node': {
          const statements = generatePythonFunctionNodeCode({
            context,
            node,
            flow,
          });
          return [node, statements];
        }
        case 'subflow': {
          const statements = generateSubflowCode({
            context,
            node,
            flow,
            localStatePath,
          });
          return [node, statements];
        }
        case 'map': {
          const statements = generateMapCode({
            context,
            node,
            flow,
            localStatePath,
          });
          return [node, statements];
        }
        case 'conditional': {
          // custom handling
          return [node, []];
        }
      }
    });
  const nonGatewayNodes = nodes.filter(
    (node): node is AsNodes => !isSubflowGatewayNode(node)
  );
  const optimizedStatements = optimizeConditionals(
    context,
    nonGatewayNodes,
    edges,
    orderedStatementLists
  );

  const statements = [
    ...subflowStatements,
    ...customPythonFunctionStatements,
    ...optimizedStatements,
  ];

  //TODO: Maybe move this to a separate file

  const saveResultsHelperFunction =
    getSaveResultsHelperFunction(localResultsDir);
  const updateStateHelperFunction = getUpdateStateHelperFunction(
    localResultsDir,
    localStatePath
  );
  const loggingHelperFunction = getLoggingHelperFunction(localResultsDir);

  const gatewayNodes = nodes.filter(isSubflowGatewayNode);
  const isSubflow = gatewayNodes.length !== 0;
  if (isSubflow) {
    const inGatewayNode = gatewayNodes.find(
      (node) => node.data.gatewayType === 'in'
    );
    const definitionStatement = generateSubflowDefinition(
      flow,
      inGatewayNode,
      oldContext // old context here because the subflow definition is used in the parent
    );

    const outGatewayNode = gatewayNodes.find(
      (node) => node.data.gatewayType === 'out'
    );
    const returnStatement = outGatewayNode
      ? generateSubflowReturn(context, flow, outGatewayNode)
      : '';

    // root level subflow -> indent all statements except the subflow function definition
    return (
      definitionStatement +
      indent(
        `
${statements.join('\n')}
${returnStatement}
    `,
        4
      )
    );
  } else {
    //Add helper function to the top of the generated code
    statements.unshift(loggingHelperFunction);
    statements.unshift(updateStateHelperFunction);
    statements.unshift(saveResultsHelperFunction);
  }

  // no indentation on the first level
  return statements.join('\n');
}
