-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlof.py
168 lines (155 loc) · 8.52 KB
/
lof.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/python
# -*- coding: utf8 -*-
"""
lof
~~~~~~~~~~~~
This module implements the Local Outlier Factor algorithm.
:copyright: (c) 2013 by Damjan Kužnar.
:license: GNU GPL v2, see LICENSE for more details.
"""
from __future__ import division
import warnings
def distance_euclidean(instance1, instance2):
"""Computes the distance between two instances. Instances should be tuples of equal length.
Returns: Euclidean distance
Signature: ((attr_1_1, attr_1_2, ...), (attr_2_1, attr_2_2, ...)) -> float"""
def detect_value_type(attribute):
"""Detects the value type (number or non-number).
Returns: (value type, value casted as detected type)
Signature: value -> (str or float type, str or float value)"""
from numbers import Number
attribute_type = None
if isinstance(attribute, Number):
attribute_type = float
attribute = float(attribute)
else:
attribute_type = str
attribute = str(attribute)
return attribute_type, attribute
# check if instances are of same length
if len(instance1) != len(instance2):
raise AttributeError("Instances have different number of arguments.")
# init differences vector
differences = [0] * len(instance1)
# compute difference for each attribute and store it to differences vector
for i, (attr1, attr2) in enumerate(zip(instance1, instance2)):
type1, attr1 = detect_value_type(attr1)
type2, attr2 = detect_value_type(attr2)
# raise error is attributes are not of same data type.
if type1 != type2:
raise AttributeError("Instances have different data types.")
if type1 is float:
# compute difference for float
differences[i] = attr1 - attr2
else:
# compute difference for string
if attr1 == attr2:
differences[i] = 0
else:
differences[i] = 1
# compute RMSE (root mean squared error)
rmse = (sum(map(lambda x: x**2, differences)) / len(differences))**0.5
return rmse
class LOF:
"""Helper class for performing LOF computations and instances normalization."""
def __init__(self, instances, normalize=True, distance_function=distance_euclidean):
self.instances = instances
self.normalize = normalize
self.distance_function = distance_function
if normalize:
self.normalize_instances()
def compute_instance_attribute_bounds(self):
min_values = [float("inf")] * len(self.instances[0]) #n.ones(len(self.instances[0])) * n.inf
max_values = [float("-inf")] * len(self.instances[0]) #n.ones(len(self.instances[0])) * -1 * n.inf
for instance in self.instances:
min_values = tuple(map(lambda x,y: min(x,y), min_values,instance)) #n.minimum(min_values, instance)
max_values = tuple(map(lambda x,y: max(x,y), max_values,instance)) #n.maximum(max_values, instance)
diff = [dim_max - dim_min for dim_max, dim_min in zip(max_values, min_values)]
if not all(diff):
problematic_dimensions = ", ".join(str(i+1) for i, v in enumerate(diff) if v == 0)
warnings.warn("No data variation in dimensions: %s. You should check your data or disable normalization." % problematic_dimensions)
self.max_attribute_values = max_values
self.min_attribute_values = min_values
def normalize_instances(self):
"""Normalizes the instances and stores the infromation for rescaling new instances."""
if not hasattr(self, "max_attribute_values"):
self.compute_instance_attribute_bounds()
new_instances = []
for instance in self.instances:
new_instances.append(self.normalize_instance(instance)) # (instance - min_values) / (max_values - min_values)
self.instances = new_instances
def normalize_instance(self, instance):
return tuple(map(lambda value,max,min: (value-min)/(max-min) if max-min > 0 else 0,
instance, self.max_attribute_values, self.min_attribute_values))
def local_outlier_factor(self, min_pts, instance):
"""The (local) outlier factor of instance captures the degree to which we call instance an outlier.
min_pts is a parameter that is specifying a minimum number of instances to consider for computing LOF value.
Returns: local outlier factor
Signature: (int, (attr1, attr2, ...), ((attr_1_1, ...),(attr_2_1, ...), ...)) -> float"""
if self.normalize:
instance = self.normalize_instance(instance)
return local_outlier_factor(min_pts, instance, self.instances, distance_function=self.distance_function)
def k_distance(k, instance, instances, distance_function=distance_euclidean):
#TODO: implement caching
"""Computes the k-distance of instance as defined in paper. It also gatheres the set of k-distance neighbours.
Returns: (k-distance, k-distance neighbours)
Signature: (int, (attr1, attr2, ...), ((attr_1_1, ...),(attr_2_1, ...), ...)) -> (float, ((attr_j_1, ...),(attr_k_1, ...), ...))"""
distances = {}
for instance2 in instances:
distance_value = distance_function(instance, instance2)
if distance_value in distances:
distances[distance_value].append(instance2)
else:
distances[distance_value] = [instance2]
distances = sorted(distances.items())
neighbours = []
[neighbours.extend(n[1]) for n in distances[:k]]
k_distance_value = distances[k - 1][0] if len(distances) >= k else distances[-1][0]
return k_distance_value, neighbours
def reachability_distance(k, instance1, instance2, instances, distance_function=distance_euclidean):
"""The reachability distance of instance1 with respect to instance2.
Returns: reachability distance
Signature: (int, (attr_1_1, ...),(attr_2_1, ...)) -> float"""
(k_distance_value, neighbours) = k_distance(k, instance2, instances, distance_function=distance_function)
return max([k_distance_value, distance_function(instance1, instance2)])
def local_reachability_density(min_pts, instance, instances, **kwargs):
"""Local reachability density of instance is the inverse of the average reachability
distance based on the min_pts-nearest neighbors of instance.
Returns: local reachability density
Signature: (int, (attr1, attr2, ...), ((attr_1_1, ...),(attr_2_1, ...), ...)) -> float"""
(k_distance_value, neighbours) = k_distance(min_pts, instance, instances, **kwargs)
reachability_distances_array = [0]*len(neighbours) #n.zeros(len(neighbours))
for i, neighbour in enumerate(neighbours):
reachability_distances_array[i] = reachability_distance(min_pts, instance, neighbour, instances, **kwargs)
if not any(reachability_distances_array):
warnings.warn("Instance %s (could be normalized) is identical to all the neighbors. Setting local reachability density to inf." % repr(instance))
return float("inf")
else:
return len(neighbours) / sum(reachability_distances_array)
def local_outlier_factor(min_pts, instance, instances, **kwargs):
"""The (local) outlier factor of instance captures the degree to which we call instance an outlier.
min_pts is a parameter that is specifying a minimum number of instances to consider for computing LOF value.
Returns: local outlier factor
Signature: (int, (attr1, attr2, ...), ((attr_1_1, ...),(attr_2_1, ...), ...)) -> float"""
(k_distance_value, neighbours) = k_distance(min_pts, instance, instances, **kwargs)
instance_lrd = local_reachability_density(min_pts, instance, instances, **kwargs)
lrd_ratios_array = [0]* len(neighbours)
for i, neighbour in enumerate(neighbours):
instances_without_instance = set(instances)
instances_without_instance.discard(neighbour)
neighbour_lrd = local_reachability_density(min_pts, neighbour, instances_without_instance, **kwargs)
lrd_ratios_array[i] = neighbour_lrd / instance_lrd
return sum(lrd_ratios_array) / len(neighbours)
def outliers(k, instances, **kwargs):
"""Simple procedure to identify outliers in the dataset."""
instances_value_backup = instances
outliers = []
for i, instance in enumerate(instances_value_backup):
instances = list(instances_value_backup)
instances.remove(instance)
l = LOF(instances, **kwargs)
value = l.local_outlier_factor(k, instance)
if value > 1:
outliers.append({"lof": value, "instance": instance, "index": i})
outliers.sort(key=lambda o: o["lof"], reverse=True)
return outliers