## Import necessary libraries
import numpy as np
from multiprocessing import Pool
import os
import copy
import simspace as ss
import warnings
warnings.filterwarnings('ignore')
## Define the Fitness Function, which do the simulation first and then calculate the fitness value
#### The fitness value is the distance between the simulation result and the experimental data
def _get_spatial_statistics(simulation, n_state):
simulation.columns = ['celltype', 'x', 'y', 'celltype_ranked']
tmp = simulation[simulation['celltype'] == -1].index
simulation = simulation.drop(tmp)
simulation = simulation.reset_index(drop=True)
simulation['celltype'] = simulation['celltype'].astype(int)
cell_type = simulation['celltype'].unique()
cell_type = np.sort(cell_type)
mi_sim = ss.spatial.integrate_morans_I(simulation['celltype'], simulation[['x', 'y']], cell_type)
if len(mi_sim) < n_state:
mi_sim = np.pad(mi_sim, (0, n_state - len(mi_sim)), 'constant', constant_values=np.nan)
cell_counts = simulation['celltype'].value_counts().sort_index()
mi_sim = [mi_sim[i] for i in np.argsort(-cell_counts)]
if simulation[['x', 'y']].shape[0] < 40:
le_pdf = np.zeros(11)
else:
le_sim = ss.spatial.calculate_local_entropy(simulation['celltype'], simulation[['x', 'y']], k = 10) # k = 20 in Michelle's data
le_pdf, _ = np.histogram(le_sim, bins=[x / 4 for x in range(0, 12)], density=True)
mi_sim = np.array(mi_sim)
sim_res = np.hstack((le_pdf, mi_sim))
return sim_res
## Define the Crossover Function, which generates new individuals by combining the genes of the selected individuals
def _crossover(parent1, parent2, crossover_rate):
if parent1['n_group'] != parent2['n_group']:
warnings.warn("Parents have different number of groups")
return parent1
if np.random.rand() < crossover_rate:
child = {
"n_group": parent1['n_group'],
"n_state": parent1['n_state'],
"niche_theta": np.where(np.random.rand() < 0.5, parent1['niche_theta'], parent2['niche_theta']),
"theta_list": [np.where(np.random.rand() < 0.5, parent1['theta_list'][i], parent2['theta_list'][i]) for i in range(parent1['n_group'])],
"density_replicates": np.where(np.random.rand() < 0.5, parent1['density_replicates'], parent2['density_replicates']),
"phi_replicates": float(np.where(np.random.rand() < 0.5, parent1['phi_replicates'], parent2['phi_replicates']))
}
return child
return parent1
## Define the Mutation Function, which introduces random changes to the genes of the selected individuals
def _mutate(individual, mutation_rate):
for i in range(len(individual['niche_theta'])):
if np.random.rand() < mutation_rate:
individual['niche_theta'][i] += np.random.normal(0, 0.2)
for i in range(len(individual['theta_list'])):
for j in range(len(individual['theta_list'][i])):
if np.random.rand() < mutation_rate:
individual['theta_list'][i][j] += np.random.normal(0, 0.2)
for i in range(len(individual['density_replicates'])):
if np.random.rand() < mutation_rate:
individual['density_replicates'][i] += np.random.normal(0, 0.05)
if individual['density_replicates'][i] < 0:
individual['density_replicates'][i] = 0.01
if np.random.rand() < mutation_rate:
individual['phi_replicates'] += np.random.normal(0, 0.1)
return individual
def _initialize_population(population_size, n_group, n_state, group_mean = 3, seed=0):
"""
Initialize a population of simulation parameters for the evolutionary algorithm.
Args:
population_size: Number of individuals in the population.
n_group: Number of groups in the simulation.
n_state: Number of states in the simulation.
group_mean: Mean number of groups (default is 3).
seed: Random seed for reproducibility (default is 0).
Returns:
param_dist_list: List of dictionaries containing the simulation parameters for each individual.
"""
np.random.seed(seed)
param_dist_list = []
if n_group < 0:
n_group = n_state // group_mean
if n_group < 1:
n_group = 1
for i in range(population_size):
# subgroup_length = split_group(n_state, group_mean)
niche_theta = np.random.uniform(-0.5, 0.5, size=(n_group-1)*n_group//2)
# theta = np.random.uniform(-0.8, 0.8, size=(n_state-1)*n_state//2)
theta_list = []
for i in range(n_group):
theta = np.random.uniform(-0.8, 0.8, size=(n_state-1)*n_state//2)
theta_list.append(theta)
density_replicates = np.random.uniform(0.01, 0.4, size=n_state)
phi_replicates = np.random.uniform(4.4, 5)
param_dist = {
"n_group": n_group,
"n_state": n_state,
"niche_theta": niche_theta,
"theta_list": theta_list,
"density_replicates": density_replicates,
"phi_replicates": phi_replicates}
param_dist_list.append(param_dist)
return param_dist_list
def _parallel_fitness_evaluation(
population,
target,
shape,
custom_neighbor,
num_iterations,
n_iter,
parallel=True,
replicate = 1):
"""
Evaluate the fitness of the entire population in parallel.
"""
if parallel:
cpu_count = _get_cpu_count()
else:
cpu_count = 1
with Pool(processes=cpu_count) as pool:
fitness_scores = pool.starmap(_fitness_function,
[(ind, shape, custom_neighbor, num_iterations, n_iter, target, replicate) for ind in population])
return np.array(fitness_scores)
def _get_cpu_count():
"""
Get the number of CPUs allocated by SLURM or default to all available CPUs.
"""
return int(os.getenv('SLURM_CPUS_PER_TASK', os.cpu_count()))
def _fitness_function(
parameters,
shape,
custom_neighbor,
num_iterations,
n_iter,
target_vector,
replicate = 1):
if replicate == 1:
Sim1 = ss.util.sim_from_params(
parameters=parameters,
shape=shape,
custom_neighbor=custom_neighbor,
num_iteration=num_iterations,
n_iter=n_iter,
seed=0)
sim_res = _get_spatial_statistics(Sim1.meta, parameters['n_state'])
if len(sim_res) != len(target_vector):
fitness = np.linalg.norm(sim_res)
else:
fitness = np.linalg.norm(sim_res - target_vector)
elif replicate > 1:
assert isinstance(replicate, int), "Replicate should be an integer"
fitness = []
for i in range(replicate):
Sim = ss.util.sim_from_params(
parameters=parameters,
shape=shape,
custom_neighbor=custom_neighbor,
num_iteration=num_iterations,
n_iter=n_iter,
seed=i)
sim_res = _get_spatial_statistics(Sim.meta, parameters['n_state'])
if len(sim_res) != len(target_vector):
fitness.append(np.linalg.norm(sim_res))
else:
fitness.append(np.linalg.norm(sim_res - target_vector))
fitness = np.mean(fitness)
else:
raise ValueError("Replicate should be a positive integer")
return fitness
def _tournament_selection(population, fitness_scores, k=3):
selected = []
for _ in range(len(population)):
indices = np.random.choice(len(population), k, replace=False)
best = indices[np.argmin(fitness_scores[indices])]
selected.append(population[best])
return selected
[docs]
def spatial_fit(
target,
population_size: int=50,
generations: int=20,
mutation_rate: float=0.2,
crossover_rate: float=0.6,
shape: tuple=(50, 50),
n_group: int=2,
n_state: int=8,
custom_neighbor: list=ss.spatial.generate_offsets(3, 'manhattan'),
num_iterations: int=4,
n_iter: int=6,
replicate: int=1,
seed: int=0,
parallel: bool=True,
verbose: bool=True
):
"""
Perform the Evolutionary Algorithm to optimize simulation parameters based on a target vector.
Args:
target (list): Target vector to optimize against.
population_size (int): Number of individuals in the population (default is 50).
generations (int): Number of generations to run the algorithm (default is 20).
mutation_rate (float): Probability of mutation for each individual (default is 0.2).
crossover_rate (float): Probability of crossover between individuals (default is 0.6).
shape (tuple): Shape of the simulation grid (default is (50, 50)).
n_group (int): Number of groups in the simulation (default is 2).
n_state (int): Number of states in the simulation (default is 8).
custom_neighbor (list): Custom neighbor offsets for the simulation (default is None).
num_iterations (int): Number of iterations for the simulation (default is 4).
n_iter (int): Number of iterations for the simulation (default is 6).
replicate (int): Number of replicates for the simulation (default is 1).
seed (int): Random seed for reproducibility (default is 0).
parallel (bool): Whether to run the fitness evaluation in parallel (default is True).
verbose (bool): Whether to print progress information (default is True).
Returns:
best_solution (dict): The best solution found during the optimization process.
Raises:
ValueError: If the target vector is not a list or if the population size is not a positive integer.
"""
if not isinstance(target, list):
if not isinstance(target, np.ndarray):
raise ValueError("Target should be a list or numpy array containing local entropy and Moran's I values.")
target = target.tolist()
if not isinstance(population_size, int) or population_size <= 0:
raise ValueError("Population size should be a positive integer.")
## Define (or load) the initial population, which is the simulation parameters
population = _initialize_population(population_size, n_group, n_state, seed=seed)
## Call the Evolutionary Algorithm Function to perform the EA
best_solution = None
best_fitness = float('inf')
for generation in range(generations):
# Evaluate fitness
fitness_scores = _parallel_fitness_evaluation(
population=population,
target=target,
replicate=replicate,
shape=shape,
custom_neighbor=custom_neighbor,
num_iterations=num_iterations,
n_iter=n_iter,
parallel=parallel
)
# Track the best solution
best_gen_idx = np.argmin(fitness_scores)
if fitness_scores[best_gen_idx] < best_fitness:
best_fitness = fitness_scores[best_gen_idx]
best_solution = copy.deepcopy(population[best_gen_idx])
if verbose:
print(f"Generation {generation}: Best Fitness = {best_fitness}")
# Selection
selected_population = _tournament_selection(population, fitness_scores)
# Crossover and mutation
next_generation = []
for i in range(0, population_size, 2):
parent1 = selected_population[i]
parent2 = selected_population[(i+1) % population_size]
child1 = _crossover(parent1, parent2, crossover_rate)
child2 = _crossover(parent2, parent1, crossover_rate)
next_generation.append(_mutate(child1, mutation_rate))
next_generation.append(_mutate(child2, mutation_rate))
population = next_generation
if verbose:
print("Optimization complete!")
print("Best solution:", best_solution)
print("Best fitness:", best_fitness)
return best_solution