| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | """A programs database that implements the evolutionary algorithm.""" |
| | from collections.abc import Mapping, Sequence |
| | import copy |
| | import dataclasses |
| | import time |
| | from typing import Any |
| |
|
| | import numpy as np |
| | import scipy |
| |
|
| | Signature = tuple[float, ...] |
| | ScoresPerTest = Mapping[Any, float] |
| |
|
| |
|
| | def _softmax(logits: np.ndarray, temperature: float) -> np.ndarray: |
| | """Returns the tempered softmax of 1D finite `logits`.""" |
| | if not np.all(np.isfinite(logits)): |
| | non_finites = set(logits[~np.isfinite(logits)]) |
| | raise ValueError(f'`logits` contains non-finite value(s): {non_finites}') |
| | if not np.issubdtype(logits.dtype, np.floating): |
| | logits = np.array(logits, dtype=np.float32) |
| |
|
| | result = scipy.special.softmax(logits / temperature, axis=-1) |
| | |
| | index = np.argmax(result) |
| | result[index] = 1 - np.sum(result[0:index]) - np.sum(result[index+1:]) |
| | return result |
| |
|
| |
|
| | def _reduce_score(scores_per_test: ScoresPerTest) -> float: |
| | """Reduces per-test scores into a single score.""" |
| | return scores_per_test[list(scores_per_test.keys())[-1]] |
| |
|
| |
|
| | def _get_signature(scores_per_test: ScoresPerTest) -> Signature: |
| | """Represents test scores as a canonical signature.""" |
| | return tuple(scores_per_test[k] for k in sorted(scores_per_test.keys())) |
| |
|
| |
|
| | @dataclasses.dataclass(frozen=True) |
| | class Prompt: |
| | """A prompt produced by the ProgramsDatabase, to be sent to Samplers. |
| | |
| | Attributes: |
| | code: The prompt, ending with the header of the function to be completed. |
| | version_generated: The function to be completed is `_v{version_generated}`. |
| | island_id: Identifier of the island that produced the implementations |
| | included in the prompt. Used to direct the newly generated implementation |
| | into the same island. |
| | """ |
| | code: str |
| | version_generated: int |
| | island_id: int |
| |
|
| |
|
| | @dataclasses.dataclass(frozen=True) |
| | class ProgramsDatabaseConfig: |
| | """Configuration of a ProgramsDatabase. |
| | |
| | Attributes: |
| | functions_per_prompt: Number of previous programs to include in prompts. |
| | num_islands: Number of islands to maintain as a diversity mechanism. |
| | reset_period: How often (in seconds) the weakest islands should be reset. |
| | cluster_sampling_temperature_init: Initial temperature for softmax sampling |
| | of clusters within an island. |
| | cluster_sampling_temperature_period: Period of linear decay of the cluster |
| | sampling temperature. |
| | """ |
| | functions_per_prompt: int = 2 |
| | num_islands: int = 4 |
| | reset_period: int = 60 * 60 |
| | cluster_sampling_temperature_init: float = 0.1 |
| | cluster_sampling_temperature_period: int = 5 |
| |
|
| |
|
| | class ProgramsDatabase: |
| | """A collection of programs, organized as islands.""" |
| |
|
| | def __init__( |
| | self, |
| | config: ProgramsDatabaseConfig, |
| | ) -> None: |
| | self._config: ProgramsDatabaseConfig = config |
| |
|
| | |
| | self._islands: list[Island] = [] |
| | for _ in range(config.num_islands): |
| | self._islands.append( |
| | Island(config.functions_per_prompt, |
| | config.cluster_sampling_temperature_init, |
| | config.cluster_sampling_temperature_period)) |
| | self._best_score_per_island: list[float] = ( |
| | [-float('inf')] * config.num_islands) |
| | self._best_program_per_island: list[int | None] = ( |
| | [None] * config.num_islands) |
| | self._best_scores_per_test_per_island: list[ScoresPerTest | None] = ( |
| | [None] * config.num_islands) |
| |
|
| | self._last_reset_time: float = time.time() |
| |
|
| | def get_seed(self) -> list[int]: |
| | """Returns a prompt containing implementations from one chosen island.""" |
| | island_id = np.random.randint(len(self._islands)) |
| | seed_ids = self._islands[island_id].get_seed() |
| | return island_id, seed_ids |
| |
|
| | def _register_program_in_island( |
| | self, |
| | program: int, |
| | program_len: int, |
| | island_id: int, |
| | scores_per_test: ScoresPerTest, |
| | ) -> None: |
| | """Registers `program` in the specified island.""" |
| | self._islands[island_id].register_program(program, program_len, scores_per_test) |
| | score = _reduce_score(scores_per_test) |
| | if score > self._best_score_per_island[island_id]: |
| | self._best_program_per_island[island_id] = program |
| | self._best_scores_per_test_per_island[island_id] = scores_per_test |
| | self._best_score_per_island[island_id] = score |
| |
|
| | def register_program( |
| | self, |
| | program: int, |
| | program_len: int, |
| | island_id: int | None, |
| | scores_per_test: ScoresPerTest, |
| | ) -> None: |
| | """Registers `program` in the database.""" |
| | |
| | |
| | |
| | if island_id is None: |
| | |
| | for island_id in range(len(self._islands)): |
| | self._register_program_in_island(program, program_len, island_id, scores_per_test) |
| | else: |
| | self._register_program_in_island(program, program_len, island_id, scores_per_test) |
| |
|
| | |
| | if (time.time() - self._last_reset_time > self._config.reset_period): |
| | self._last_reset_time = time.time() |
| | self.reset_islands() |
| |
|
| | def reset_islands(self) -> None: |
| | """Resets the weaker half of islands.""" |
| | |
| | indices_sorted_by_score: np.ndarray = np.argsort( |
| | self._best_score_per_island + |
| | np.random.randn(len(self._best_score_per_island)) * 1e-6) |
| | num_islands_to_reset = self._config.num_islands // 2 |
| | reset_islands_ids = indices_sorted_by_score[:num_islands_to_reset] |
| | keep_islands_ids = indices_sorted_by_score[num_islands_to_reset:] |
| | for island_id in reset_islands_ids: |
| | self._islands[island_id] = Island( |
| | self._template, |
| | self._function_to_evolve, |
| | self._config.functions_per_prompt, |
| | self._config.cluster_sampling_temperature_init, |
| | self._config.cluster_sampling_temperature_period) |
| | self._best_score_per_island[island_id] = -float('inf') |
| | founder_island_id = np.random.choice(keep_islands_ids) |
| | founder = self._best_program_per_island[founder_island_id] |
| | founder_scores = self._best_scores_per_test_per_island[founder_island_id] |
| | self._register_program_in_island(founder, island_id, founder_scores) |
| |
|
| |
|
| | class Island: |
| | """A sub-population of the programs database.""" |
| |
|
| | def __init__( |
| | self, |
| | functions_per_prompt: int, |
| | cluster_sampling_temperature_init: float, |
| | cluster_sampling_temperature_period: int, |
| | ) -> None: |
| | self._functions_per_prompt: int = functions_per_prompt |
| | self._cluster_sampling_temperature_init = cluster_sampling_temperature_init |
| | self._cluster_sampling_temperature_period = ( |
| | cluster_sampling_temperature_period) |
| |
|
| | self._clusters: dict[Signature, Cluster] = {} |
| | self._num_programs: int = 0 |
| |
|
| | def register_program( |
| | self, |
| | program: int, |
| | program_len: int, |
| | scores_per_test: ScoresPerTest, |
| | ) -> None: |
| | """Stores a program on this island, in its appropriate cluster.""" |
| | signature = _get_signature(scores_per_test) |
| | if signature not in self._clusters: |
| | score = _reduce_score(scores_per_test) |
| | self._clusters[signature] = Cluster(score, [program], [program_len]) |
| | else: |
| | self._clusters[signature].register_program(program, program_len) |
| | self._num_programs += 1 |
| |
|
| | def get_seed(self) -> tuple[str, int]: |
| | """Constructs a prompt containing functions from this island.""" |
| | signatures = list(self._clusters.keys()) |
| | cluster_scores = np.array( |
| | [self._clusters[signature].score for signature in signatures]) |
| |
|
| | |
| | period = self._cluster_sampling_temperature_period |
| | temperature = self._cluster_sampling_temperature_init * ( |
| | 1 - (self._num_programs % period) / period) |
| | probabilities = _softmax(cluster_scores, temperature) |
| |
|
| | |
| | |
| | functions_per_prompt = min(len(self._clusters), self._functions_per_prompt) |
| |
|
| | idx = np.random.choice( |
| | len(signatures), size=functions_per_prompt, p=probabilities) |
| | chosen_signatures = [signatures[i] for i in idx] |
| | implementations = [] |
| | scores = [] |
| | for signature in chosen_signatures: |
| | cluster = self._clusters[signature] |
| | implementations.append(cluster.sample_program()) |
| | scores.append(cluster.score) |
| |
|
| | indices = np.argsort(scores) |
| | sorted_implementations = [implementations[i] for i in indices] |
| | return sorted_implementations |
| |
|
| |
|
| | class Cluster: |
| | """A cluster of programs on the same island and with the same Signature.""" |
| |
|
| | def __init__(self, score: float, init_programs: list[int], init_program_lengths: list[int]) -> None: |
| | self._score = score |
| | |
| | self._programs: list[int] = init_programs |
| | self._lengths: list[int] = init_program_lengths |
| |
|
| | @property |
| | def score(self) -> float: |
| | """Reduced score of the signature that this cluster represents.""" |
| | return self._score |
| |
|
| | def register_program(self, program_idx: int, program_length: int) -> None: |
| | """Adds `program` to the cluster.""" |
| | self._programs.append(program_idx) |
| | self._lengths.append(program_length) |
| |
|
| | def sample_program(self) -> int: |
| | """Samples a program, giving higher probability to shorther programs.""" |
| | normalized_lengths = (np.array(self._lengths) - min(self._lengths)) / ( |
| | max(self._lengths) + 1e-6) |
| | probabilities = _softmax(-normalized_lengths, temperature=1.0) |
| | return np.random.choice(self._programs, p=probabilities) |