局部敏感哈希(原始LSH)python实
最近短期计划是学习一下python,最好的学习方式当然是实践了,今天用Python实现了下lsh算法,代码比较简陋。。。(2016.1.17)
origionalLSH.py:
import random
class Bucket:
features=[]
name=[]
def __init__(self):
self.features=[]
self.name=[]
def addFeature(self,feature,nameID):
self.features.append(feature)
self.name.append(nameID)
def size(self):
return len(self.features)
class Table:
buckets=[]
size=0
def __init__(self,tableSize):
for i in xrange(tableSize):
bucket=Bucket()
self.buckets.append(bucket)
def addFeature(self,feature,bucketID,nameID):
self.buckets[bucketID].addFeature(feature,nameID)
def size(self):
return self.size
class LSH:
__m_k = 0
__m_d = 0
__m_l = 0
__m_M = 0
__m_FeatureDims = 0
__m_hashFun_level1Subset =[]
__m_hashFun_level2=[]
__m_table = Table(0)
__m_MaxValue = 0
def __init__(self,maxValue,l,m,ratio,featureDims):
self.__m_k = int(maxValue*featureDims*ratio)
self.__m_d = maxValue*featureDims
self.__m_l = l
self.__m_M = m
self.__m_FeatureDims = featureDims
self.__m_MaxValue = maxValue
self.__m_table = Table(self.__m_M)
# __m_hashFun_level1Subset用初始化么?
for i in xrange(m):
temp0 =[]
for j in xrange(featureDims):
tem1 = []
temp0.append(tem1)
self.__m_hashFun_level1Subset.append(temp0)
for i in xrange(self.__m_k):
self.__m_hashFun_level2.append(random.randint(0,self.__m_M-1))
def __GetHashFun_level1(self): #生成一级哈希函数
for i in xrange(self.__m_M):
for j in xrange(self.__m_k):
val = random.randint(0,self.__m_d-1)#随机选取位置
pos = int(val/self.__m_MaxValue) #对应原始特征的哪一维
self.__m_hashFun_level1Subset[i][pos].append(val)
def __Hash_level1(self,feature,tableID):
value = []
table = self.__m_hashFun_level1Subset[tableID]
for i in xrange(len(table)):
val0 = feature[i]
one_num = 0
zero_num = 0
for j in xrange(len(table[i])):
val1 = table[i][j]-self.__m_d*i
if val1<=val0:
one_num +=1
else:
zero_num +=1
while one_num > 0:
value.append(1)
one_num -=1
while zero_num > 0:
value.append(0)
zero_num -=1
return value
def __HashLevel2(self,value,):
buTKEtID = -1
temp = 0
for i in xrange(len(value)):
temp += self.__m_hashFun_level2[i]*value[i]
bucketID = temp % self.__m_M
return bucketID
def train(self,features):
self.__GetHashFun_level1()
num = len(features) #特征个数
for i in xrange(num):
feature = features[i]
for j in xrange(self.__m_l):
value = self.__Hash_level1(feature,j)
bucketID = self.__HashLevel2(value)
self.__m_table.addFeature(feature,bucketID,i)
def search(self,feature):
name = -1
dist = -1
minDist = 10000000
buckets = []
#hash 获取所有候选bucket
for i in xrange(self.__m_l):
value = self.__Hash_level1(feature,i)
bucketID = self.__HashLevel2(value)
buckets.append(bucketID)
print"查找时遍历痛的个数为: %d" %(len(buckets))
for i in xrange(len(buckets)):#遍历候选bucket
tempFeatures = self.__m_table.buckets[i].features
tempName = self.__m_table.buckets[i].name
num = len(tempFeatures)
print "该桶含有的特征个数为:%d" %(num)
for j in xrange(num):
dist = self.__calcDist(feature,tempFeatures[j])
if dist < minDist:
minDist = dist
name = tempName[j]
return name,minDist
def __calcDist(self,feature0,feature1):
dist = 0
length = len(feature0)
for i in xrange(length):
dist += abs(feature0[i]-feature1[i])
return dist
# code:utf-8
test.py:
from origionalLSH import *
featureNum = 10000
featureLength = 40
#step1: 生成特征
print "step1: 生成特征"
features = []
for i in xrange(featureNum):
temp = []
for j in xrange(featureLength):
temp.append(random.randint(0,255))
features.append(temp)
#step2: LSH初始化
print "step2: LSH初始化"
#LSH lsh(255,10,100,0.1,featureLength)
lsh =LSH(255,10,100,0.12,featureLength)
#step3: 开始训练
print "step3: 开始训练"
lsh.train(features)
#step4: search:
print"step4: search:"
name,dist = lsh.search(features[457])
print "最近的距离为:%d, 行号为%d" %(dist,name)
相关文章