Source code for turbustat.statistics.cramer.cramer
# Licensed under an MIT open source license - see LICENSE
'''
Implementation of the Cramer Statistic
'''
import numpy as np
from sklearn.metrics.pairwise import pairwise_distances
from ..threeD_to_twoD import _format_data
[docs]class Cramer_Distance(object):
"""
Compute the Cramer distance between two data cubes. The data cubes
are flattened spatially to give 2D objects. We clip off empty channels
and keep only the top quartile in the remaining channels.
Parameters
----------
cube1 : numpy.ndarray
First cube to compare.
cube2 : numpy.ndarray
Second cube to compare.
noise_value1 : float, optional
Noise level in the first cube.
noise_value2 : float, optional
Noise level in the second cube.
data_format : str, optional
Method to arange cube into 2D. Only 'intensity' is currently
implemented.
"""
def __init__(self, cube1, cube2, noise_value1=0.1,
noise_value2=0.1):
super(Cramer_Distance, self).__init__()
self.cube1 = cube1
self.cube2 = cube2
self.noise_value1 = noise_value1
self.noise_value2 = noise_value2
self.data_matrix1 = None
self.data_matrix2 = None
self.distance = None
[docs] def cramer_statistic(self, n_jobs=1):
'''
Applies the Cramer Statistic to the datasets.
Parameters
----------
n_jobs : int, optional
Sets the number of cores to use to calculate
pairwise distances
'''
# Adjust what we call n,m based on the larger dimension.
# Then the looping below is valid.
if self.data_matrix1.shape[0] >= self.data_matrix2.shape[0]:
m = self.data_matrix1.shape[0]
n = self.data_matrix2.shape[0]
larger = self.data_matrix1
smaller = self.data_matrix2
else:
n = self.data_matrix1.shape[0]
m = self.data_matrix2.shape[0]
larger = self.data_matrix2
smaller = self.data_matrix1
pairdist11 = pairwise_distances(
larger, metric="euclidean", n_jobs=n_jobs)
pairdist22 = pairwise_distances(
smaller, metric="euclidean", n_jobs=n_jobs)
pairdist12 = pairwise_distances(
larger, smaller,
metric="euclidean", n_jobs=n_jobs)
term1 = 0.0
term2 = 0.0
term3 = 0.0
for i in range(m):
for j in range(n):
term1 += pairdist12[i, j]
for ii in range(m):
term2 += pairdist11[i, ii]
if i < n:
for jj in range(n):
term3 += pairdist22[i, jj]
m, n = float(m), float(n)
term1 *= (1 / (m * n))
term2 *= (1 / (2 * m ** 2.))
term3 *= (1 / (2 * n ** 2.))
self.distance = (m * n / (m + n)) * (term1 - term2 - term3)
return self
[docs] def distance_metric(self, n_jobs=1):
'''
This serves as a simple wrapper in order to remain with the coding
convention used throughout the rest of this project.
'''
self.format_data()
self.cramer_statistic(n_jobs=n_jobs)
return self