| | from copy import deepcopy |
| | from typing import Dict, Any |
| |
|
| | from aiflows.base_flows import SequentialFlow |
| | from aiflows.utils import logging |
| | from abc import ABC |
| |
|
| |
|
| | logging.set_verbosity_debug() |
| | log = logging.get_logger(__name__) |
| |
|
| | class AbstractBossFlow(SequentialFlow, ABC): |
| | """This class is an abstraction of memory-planner-controller-executor flow. At a higher level, it is |
| | an abstract agent empowered by multiple language models and subsequent tools like code interpreters, etc. |
| | It is designed to cooperate with memory management mechanisms, lm-powered planner and controller, and |
| | arbitrary executors. |
| | |
| | *Configuration Parameters* |
| | |
| | - `name` (str): Name of the flow. |
| | - `description` (str): Description of the flow. |
| | - `memory_files` (dict): A dictionary of memory files. The keys are the names of the memory files and the values |
| | are the path to the memory files. Typical memory files include plan, logs, code library. |
| | - `subflows_config`: |
| | - MemoryReading: reads the content of the memory files into the flow states for later use. |
| | - Planner: make a step-by-step plan based on the current goal. |
| | - CtrlExMem: controller-executor agent with memory reading and memory writing, it will execute the plan generated by the planner. |
| | - `early_exit_key` (str): The key in the flow state that indicates the early exit condition. |
| | - `topology` (list) : The topology of the flow. |
| | |
| | *Input Interface (expected input)* |
| | |
| | - `goal` (str): The goal from the caller (source flow) |
| | |
| | *Output Interface (expected output)* |
| | |
| | - `result` (str): The result of the flow, the result will be returned to the caller. |
| | - `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow. |
| | |
| | :param memory_files: A dictionary of memory files. The keys are the names of the memory files and the values are the path to the memory files. |
| | :type memory_files: dict |
| | """ |
| | REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology", "memory_files"] |
| |
|
| | def __init__( |
| | self, |
| | memory_files: Dict[str, Any], |
| | **kwargs |
| | ): |
| | super().__init__(**kwargs) |
| | self.memory_files = memory_files |
| |
|
| | @classmethod |
| | def instantiate_from_config(cls, config): |
| | """This method instantiates the flow from a configuration dictionary. |
| | |
| | :param config: The configuration dictionary. |
| | :type config: dict |
| | """ |
| | flow_config = deepcopy(config) |
| |
|
| | kwargs = {"flow_config": flow_config} |
| |
|
| | |
| | memory_files = flow_config["memory_files"] |
| | kwargs.update({"memory_files": memory_files}) |
| |
|
| | |
| | kwargs.update({"subflows": cls._set_up_subflows(flow_config)}) |
| |
|
| | |
| | return cls(**kwargs) |
| |
|
| | def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
| | """This method runs the flow. |
| | |
| | :param input_data: The input data, the input_data is supposed to contain 'goal' |
| | :type input_data: dict |
| | """ |
| | |
| | self._state_update_dict(update_data=input_data) |
| |
|
| | |
| | self._state_update_dict(update_data={"memory_files": self.memory_files}) |
| |
|
| | max_rounds = self.flow_config.get("max_rounds", 1) |
| | if max_rounds is None: |
| | log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
| |
|
| | self._sequential_run(max_rounds=max_rounds) |
| |
|
| | output = self._get_output_from_state() |
| |
|
| | return output |