import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import libpysal
from esda.moran import Moran
from esda.moran import Moran_Local
from esda.geary import Geary
from libpysal.weights import KNN
from sklearn.neighbors import NearestNeighbors
from scipy.spatial import cKDTree
import scipy.spatial.distance as sdist
[docs]
def generate_offsets(distance,
method: str = 'manhattan',
linear: bool = False):
"""
Generate neighbor offsets based on the specified distance and method.
Args:
distance (int): Distance parameter.
method (str): Method to generate offsets ('manhattan' or 'euclidean').
linear (bool): Whether to include the cell itself in the offsets (default is False).
Returns:
list: List of generated neighbor offsets.
Raises:
ValueError: If the distance is not an integer or if the method is not recognized.
Examples:
>>> from simspace.spatial import generate_offsets
>>> offsets = generate_offsets(1, 'manhattan')
>>> print(offsets)
[(-1, 0), (0, -1), (0, 1), (1, 0)]
>>> offsets = generate_offsets(2, 'euclidean', linear=True)
>>> print(offsets)
[(-2, 0), (-1, -1), (-1, 0), (-1, 1), (0, -2), (0, -1), (0, 1), (0, 2), (1, -1), (1, 0), (1, 1), (2, 0), (0, 0)]
"""
if not isinstance(distance, int):
raise ValueError("The distance must be an integer.")
if method not in ['manhattan', 'euclidean']:
raise ValueError("The method should be either 'manhattan' or 'euclidean'.")
offsets = []
for i in range(-distance, distance+1):
for j in range(-distance, distance+1):
if method == 'manhattan':
if abs(i) + abs(j) <= distance and (i, j) != (0, 0):
offsets.append((i, j))
elif method == 'euclidean':
if np.sqrt(i**2 + j**2) <= distance and (i, j) != (0, 0):
offsets.append((i, j))
if linear:
offsets.append((0, 0)) # Include the cell itself if linear is True
return offsets
[docs]
def generate_offsets3D(distance, method, linear=False):
"""
Generate 3D neighbor offsets based on the specified distance and method.
Args:
distance (int): Distance parameter.
method (str): Method to generate offsets ('manhattan' or 'euclidean').
linear (bool): Whether to include the cell itself in the offsets (default is False).
Returns:
list: List of generated neighbor offsets.
Raises:
ValueError: If the distance is not an integer or if the method is not recognized.
Examples:
>>> offsets = generate_offsets3D(3, 'manhattan')
>>> print(offsets)
"""
if not isinstance(distance, int):
raise ValueError("The distance must be an integer.")
if method not in ['manhattan', 'euclidean']:
raise ValueError("The method should be either 'manhattan' or 'euclidean'.")
offsets = []
for i in range(-distance, distance+1):
for j in range(-distance, distance+1):
for k in range(-distance, distance+1):
if method == 'manhattan':
if abs(i) + abs(j) + abs(k)*2 <= distance and (i, j, k) != (0, 0, 0):
offsets.append((i, j, k))
elif method == 'euclidean':
if np.sqrt(i**2 + j**2 + 4*k**2) <= distance and (i, j, k) != (0, 0, 0):
offsets.append((i, j, k))
if linear:
offsets.append((0, 0, 0))
return offsets
## Calculate Moran's I
[docs]
def calculate_morans_I(
data: pd.DataFrame,
coordinates: pd.DataFrame,
k = 5) -> float:
"""
Calculate Moran's I for a given dataset and spatial weights.
Args:
data: pandas DataFrame containing the variable of interest.
coordinates: numpy array or pandas DataFrame containing the spatial coordinates. Used for libpysal.cg.KDTree()
k: number of nearest neighbors to consider for spatial weights. Default is 5.
Returns:
morans_I: Moran's I value.
"""
kd = libpysal.cg.KDTree(coordinates)
weights = KNN(kd, k=k)
# Calculate Moran's I
morans_I = Moran(data, weights)
return morans_I.I
[docs]
def integrate_morans_I(data: pd.DataFrame,
coordinates: pd.DataFrame,
typelist) -> list:
"""
Calculate Moran's I for a given dataset and spatial weights.
Args:
data: pandas DataFrame containing the variable of interest.
coordinates: numpy array or pandas DataFrame containing the spatial coordinates. Used for libpysal.cg.KDTree()
typelist: list of types to calculate Moran's I for.
Returns:
mi_list: List of Moran's I values for each type in typelist.
Raises:
ValueError: If typelist is empty.
"""
if len(typelist) == 0:
raise ValueError("typelist must be a non-empty list or non-empty array.")
mi_list = []
for type in typelist:
tmp = data == type
mi = calculate_morans_I(tmp, coordinates)
mi_list.append(mi)
return mi_list
## Calculate Geary's C
[docs]
def calculate_gearys_C(data: pd.DataFrame,
coordinates: pd.DataFrame,
k: int = 20) -> float:
"""
Calculate Geary's C for a given dataset and spatial weights.
Args:
data: pandas DataFrame or Series containing the variable of interest.
coordinates: numpy array or pandas DataFrame containing the spatial coordinates.
k: number of nearest neighbors to consider for spatial weights.
"""
kd = libpysal.cg.KDTree(coordinates)
weights = KNN(kd, k=k)
if isinstance(data, np.ndarray) and data.dtype == bool:
data = data.astype(int)
elif isinstance(data, pd.Series) and data.dtype == bool:
data = data.astype(int)
elif isinstance(data, pd.DataFrame):
for col in data.columns:
if data[col].dtype == bool:
data[col] = data[col].astype(int)
# Calculate Geary's C
gearys_C = Geary(data, weights)
return gearys_C.C
[docs]
def integrate_gearys_C(data: pd.DataFrame,
coordinates: pd.DataFrame,
typelist) -> list:
"""
Calculate Geary's C for a given dataset and spatial weights.
Args:
data: pandas DataFrame containing the variable of interest.
coordinates: numpy array or pandas DataFrame containing the spatial coordinates. Used for libpysal.cg.KDTree()
typelist: list of types to calculate Geary's C for.
Returns:
gc_list: List of Geary's C values for each type in typelist.
Raises:
ValueError: If typelist is empty.
"""
if len(typelist) == 0:
raise ValueError("typelist must be a non-empty list or non-empty array.")
gc_list = []
for type in typelist:
tmp = data == type
gc = calculate_gearys_C(tmp, coordinates)
gc_list.append(gc)
return gc_list
## Calculate variogram
[docs]
def compute_variogram(
coords: np.ndarray,
labels: np.ndarray,
cell_types: list,
n_bins: int = 20,
max_dist: float = None):
"""
Compute cross-variograms between all pairs of cell types.
Args:
coords: numpy array of shape (n_cells, n_dimensions) containing the spatial coordinates.
labels: numpy array of shape (n_cells,) containing the cell type labels.
cell_types: list of unique cell types to consider.
n_bins: number of distance bins to use for the variogram (default is 20).
max_dist: maximum distance to consider for the variogram (default is None, which uses half the maximum distance between points).
Returns:
variograms: dictionary where keys are tuples of cell type pairs and values are tuples of (bin_centers, gamma).
"""
tree = cKDTree(coords)
if max_dist is None:
max_dist = np.linalg.norm(coords.max(0) - coords.min(0)) / 2
bins = np.linspace(0, max_dist, n_bins + 1)
bin_centers = 0.5 * (bins[:-1] + bins[1:])
variograms = {}
for i, c1 in enumerate(cell_types):
I1 = (labels == c1).astype(float)
for j, c2 in enumerate(cell_types[i:], i):
I2 = (labels == c2).astype(float)
diffs = I1[:, None] - I2[None, :]
dists = tree.sparse_distance_matrix(tree, max_dist=max_dist, output_type='ndarray')
# compute squared differences only for pairs within max_dist
gamma = np.zeros(n_bins)
counts = np.zeros(n_bins)
for k in range(len(dists)):
d = dists['dist'][k]
b = np.searchsorted(bins, d) - 1
if b >= 0 and b < n_bins:
i1, i2 = dists['i'][k], dists['j'][k]
val = 0.5 * (I1[i1] - I2[i2])**2
gamma[b] += val
counts[b] += 1
gamma /= np.maximum(counts, 1)
variograms[(c1, c2)] = (bin_centers, gamma)
return variograms
[docs]
def integrate_variogram(
data: pd.DataFrame,
coordinates: pd.DataFrame,
typelist) -> dict:
"""
Compute the average variogram across all cell type pairs.
Args:
data: pandas DataFrame containing the variable of interest.
coordinates: numpy array or pandas DataFrame containing the spatial coordinates.
typelist: list of unique cell types to consider.
Returns:
variograms: dictionary where keys are tuples of cell type pairs and values are tuples of (bin_centers, gamma).
Raises:
ValueError: If typelist is empty.
"""
if len(typelist) == 0:
raise ValueError("typelist must be a non-empty list or non-empty array.")
variograms = compute_variogram(coordinates.values, data.values, typelist)
return variograms
## Calculate pairwise distance between cell types as cell type interaction score
#### Idea was origninally from scFeatures
[docs]
def calculate_interaction_score(
data: pd.Series | pd.DataFrame,
coordinates: pd.DataFrame | np.ndarray,
typelist: list,
k: int = 50,
summary: str = "mean",
use_knn: bool = True,
) -> pd.DataFrame:
"""
Compute a type x type matrix of inter-type distances.
By default uses k-NN distances (fast, memory-safe):
- For c1 != c2: distances from each c1 cell to its k nearest c2 neighbors.
- For c1 == c2: distances from each cell to its k nearest *other* cells of the same type
(self-matches removed).
If use_knn=False, computes full pairwise distances:
- For c1 != c2: all |c1| x |c2| distances (uses scipy.spatial.distance.cdist).
- For c1 == c2: all unique pairs within the type (uses pdist).
Args:
data: pd.Series or single-column pd.DataFrame of categorical labels (cell types).
coordinates: (N, d) array of spatial coordinates.
typelist: list of types to include in the output matrix (order preserved).
k: number of neighbors to consider for k-NN (default: 20).
summary: how to summarize the distances ("mean", "median", "min", "max").
use_knn: whether to use k-NN distances (True, default) or full pairwise distances (False).
Returns:
M: pd.DataFrame of shape (len(typelist), len(typelist)) with summarized inter-type distances.
"""
# --- sanitize inputs ---
if isinstance(data, pd.DataFrame):
if data.shape[1] != 1:
raise ValueError("`data` must be a pd.Series or a single-column DataFrame.")
data = data.iloc[:, 0]
if not isinstance(data, pd.Series):
data = pd.Series(np.asarray(data).ravel(), index=getattr(coordinates, "index", None))
# Align order with coordinates
if isinstance(coordinates, pd.DataFrame):
coords = coordinates.values
labels = data.reindex(coordinates.index).values
else:
coords = np.asarray(coordinates)
labels = np.asarray(data)
if coords.ndim != 2:
raise ValueError("`coordinates` must be 2D (N, d).")
if coords.shape[0] != labels.shape[0]:
raise ValueError("`coordinates` and `data` must have the same length.")
# mapping from type -> indices
type_to_idx = {t: np.where(labels == t)[0] for t in typelist}
# helper to summarize an array
def summarize(arr: np.ndarray) -> float:
arr = np.asarray(arr, dtype=float)
if arr.size == 0:
return np.nan
if summary == "mean":
return float(np.mean(arr))
if summary == "median":
return float(np.median(arr))
if summary == "min":
return float(np.min(arr))
if summary == "max":
return float(np.max(arr))
raise ValueError("summary must be one of {'mean','median','min','max'}")
M = pd.DataFrame(index=typelist, columns=typelist, dtype=float)
if use_knn:
# Build one KDTree per c2 (reuse across all c1 queries)
trees = {}
for c2 in typelist:
idx2 = type_to_idx[c2]
if idx2.size == 0:
trees[c2] = None
else:
trees[c2] = cKDTree(coords[idx2])
for c1 in typelist:
idx1 = type_to_idx[c1]
if idx1.size == 0:
M.loc[c1, :] = np.nan
continue
pts1 = coords[idx1]
for c2 in typelist:
idx2 = type_to_idx[c2]
tree2 = trees[c2]
if idx2.size == 0 or tree2 is None:
M.loc[c1, c2] = np.nan
continue
if c1 == c2:
# query same set; remove self-matches
# ask for k+1 neighbors then drop the first (distance 0)
kk = min(k + 1, max(1, idx2.size)) # guard tiny sets
d, _ = tree2.query(pts1, k=kk)
d = np.atleast_2d(d)
if d.shape[1] == 1:
# only self available -> no valid neighbor
M.loc[c1, c2] = np.nan
continue
d = d[:, 1:] # drop self distances
else:
kk = min(k, max(1, idx2.size))
d, _ = tree2.query(pts1, k=kk)
d = np.atleast_2d(d)
# summarize across all queried neighbor distances
M.loc[c1, c2] = summarize(d.ravel())
else:
# Full pairwise (heavier): use scipy distance routines
for c1 in typelist:
idx1 = type_to_idx[c1]
pts1 = coords[idx1]
if idx1.size == 0:
M.loc[c1, :] = np.nan
continue
for c2 in typelist:
idx2 = type_to_idx[c2]
pts2 = coords[idx2]
if idx2.size == 0:
M.loc[c1, c2] = np.nan
continue
if c1 == c2:
# all unique within-type pairs
if pts1.shape[0] < 2:
M.loc[c1, c2] = np.nan
continue
d = sdist.pdist(pts1) # condensed vector
else:
# cross-type all pairs (N1 x N2)
d = sdist.cdist(pts1, pts2).ravel()
M.loc[c1, c2] = summarize(d)
return M
## Calculate local Moran's I
[docs]
def calculate_local_morans_I(data: pd.DataFrame,
coordinates: pd.DataFrame,
k: int = 20) -> np.ndarray:
"""
Calculate local Moran's I for a given dataset and spatial weights.
Args:
data: pandas DataFrame or Series containing the variable of interest.
coordinates: numpy array or pandas DataFrame containing the spatial coordinates.
k: number of nearest neighbors to consider for spatial weights.
Returns:
local_morans_I: Local Moran's I values.
"""
kd = libpysal.cg.KDTree(coordinates)
weights = libpysal.weights.KNN(kd, k=k)
# Calculate local Moran's I
local_morans_I = Moran_Local(data, weights)
return local_morans_I.Is
## Plot local Moran's I
[docs]
def plot_local_morans_I(
data: pd.DataFrame,
coordinates: pd.DataFrame,
local_morans_I: np.ndarray,
ax=None):
"""
Plot local Moran's I values on a scatter plot.
Args:
data: pandas DataFrame or Series containing the variable of interest.
coordinates: numpy array or pandas DataFrame containing the spatial coordinates.
local_morans_I: Local Moran's I values.
ax: matplotlib axis object to plot on.
Returns:
ax: matplotlib axis object.
"""
if ax is None:
_, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))
else:
ax1, ax2 = ax
# Plot data
ax1.scatter(coordinates.iloc[:, 0], coordinates.iloc[:, 1], c=data, cmap='viridis', s=5)
# Plot local Moran's I values
ax2.scatter(coordinates.iloc[:, 0], coordinates.iloc[:, 1], c=local_morans_I, s=5, alpha=0.75)
return ax1, ax2
## Calculate the local entropy
[docs]
def calculate_local_entropy(
data: pd.DataFrame,
coordinates: pd.DataFrame,
k: int = 20) -> np.ndarray:
"""
Calculate the local entropy for a given dataset and spatial coordinates.
Args:
data: pandas DataFrame or Series containing the variable of interest.
coordinates: numpy array or pandas DataFrame containing the spatial coordinates.
k: number of nearest neighbors to consider for spatial weights.
Returns:
local_entropy: Local entropy values.
"""
data.reset_index(drop=True, inplace=True)
nbrs = NearestNeighbors(n_neighbors=k).fit(coordinates)
_, indices = nbrs.kneighbors(coordinates)
# Calculate the entropy for each cell
local_entropy = []
for i in range(len(data)):
neighbors = indices[i]
neighbor_values = data[neighbors]
_, value_counts = np.unique(neighbor_values, return_counts=True)
probabilities = value_counts / np.sum(value_counts)
entropy = -np.sum(probabilities * np.log2(probabilities))
local_entropy.append(entropy)
return local_entropy
## histogram of local entropy
[docs]
def plot_local_entropy(
local_entropy: np.ndarray,
ax=None) -> plt.Axes:
"""
Plot a histogram of local entropy values.
Args:
local_entropy: Local entropy values.
ax: matplotlib axis object to plot on.
Returns:
ax: matplotlib axis object.
"""
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=(6, 6))
ax.hist(local_entropy, bins=50, alpha=0.75)
ax.set_xlabel('Local entropy')
ax.set_ylabel('Frequency')
return ax
[docs]
def spatial_stat(
data: pd.DataFrame,
coordinates: pd.DataFrame,
typelist: list) -> np.ndarray:
"""
Calculate moran's I and local entropy for a given dataset.
Args:
data: pandas DataFrame containing the variable of interest.
coordinates: numpy array or pandas DataFrame containing the spatial coordinates.
typelist: list of types to calculate Moran's I for.
Returns:
res: numpy array containing moran's I and local entropy values.
"""
morans_I = integrate_morans_I(data, coordinates, typelist)
local_entropy = calculate_local_entropy(data, coordinates)
res = np.array([morans_I, local_entropy])
return res