Python中FFT循环提速(带`np.einsum`)

2022-04-02 00:00:00 python numpy fft loops correlation

问题描述

问题:我想用np.einsum加速我的包含大量乘积和求和的python循环,但我也对任何其他解决方案持开放态度。

我的函数采用(n,n,3)形状的向量配置S(我的情况:n=72),并对N*N个点的相关函数进行傅立叶变换。相关函数定义为每个向量与其他向量的乘积。这乘以向量位置乘以kx和ky值的余弦函数。每个位置i,j最后求和得到k-空间p,m中的一点:

def spin_spin(S,N):
    n= len(S)
    conf = np.reshape(S,(n**2,3))
    chi = np.zeros((N,N))
    kx = np.linspace(-5*np.pi/3,5*np.pi/3,N)
    ky = np.linspace(-3*np.pi/np.sqrt(3),3*np.pi/np.sqrt(3),N)

    x=np.reshape(triangular(n)[0],(n**2))
    y=np.reshape(triangular(n)[1],(n**2))
    for p in range(N):
        for m in range(N):
            for i in range(n**2):
                for j in range(n**2):        
                    chi[p,m] += 2/(n**2)*np.dot(conf[i],conf[j])*np.cos(kx[p]*(x[i]-x[j])+ ky[m]*(y[i]-y[j]))
    return(chi,kx,ky)

我的问题是,我需要大约100*100个点,用kx*ky表示,对于一个有72*72个向量的晶格,循环需要很多小时才能完成这项工作。 计算次数:72*72*72*72*100*100 我不能使用numpy的内置FFT,因为我是三角形网格,所以我需要一些其他选项来减少这里的计算成本。

我的想法:首先,我认识到将配置重塑为向量列表而不是矩阵可以降低计算成本。此外,我使用了Numba包,它也降低了成本,但仍然太慢。我发现计算这类对象的一个好方法是np.einsum函数。计算每个向量与每个向量的乘积的方法如下:

np.einsum('ij,kj -> ik',np.reshape(S,(72**2,3)),np.reshape(S,(72**2,3)))

棘手的部分是np.cos中的项的计算。这里我想计算形状列表(100,1)与向量的位置(例如np.shape(x)=(72**2,1))之间的乘积。尤其是我真的不知道如何用np.einsum实现x方向和y方向的距离。

要重新生成代码(您可能不需要这样做):首先,您需要一个向量配置。您可以简单地使用np.ones((72,72,3)或以随机向量为例:

def spherical_to_cartesian(r, theta, phi):
    '''Convert spherical coordinates (physics convention) to cartesian coordinates'''
    sin_theta = np.sin(theta)
    x = r * sin_theta * np.cos(phi)
    y = r * sin_theta * np.sin(phi)
    z = r * np.cos(theta)

    return x, y, z # return a tuple

def random_directions(n, r):
    '''Return ``n`` 3-vectors in random directions with radius ``r``'''
    out = np.empty(shape=(n,3), dtype=np.float64)

    for i in range(n):
        # Pick directions randomly in solid angle
        phi = random.uniform(0, 2*np.pi)
        theta = np.arccos(random.uniform(-1, 1))
        # unpack a tuple
        x, y, z = spherical_to_cartesian(r, theta, phi)
        out[i] = x, y, z

    return out
S = np.reshape(random_directions(72**2,1),(72,72,3))

(此示例中的重塑需要在函数spin_spin中将其重塑为(72**2,3)形状。)

对于向量的位置,我使用

定义的三角形网格
def triangular(nsize):
    '''Positional arguments of the spin configuration'''

    X=np.zeros((nsize,nsize))
    Y=np.zeros((nsize,nsize))
    for i in range(nsize):
        for j in range(nsize):
            X[i,j]+=1/2*j+i
            Y[i,j]+=np.sqrt(3)/2*j
    return(X,Y)

解决方案

优化的Numba实现

代码中的主要问题是使用非常小的数据重复调用外部BLAS函数np.dot。在这段代码中,只计算一次它们会更有意义,但如果您必须在循环中进行计算,请编写一个Numba实现。Example

优化功能(暴力)

import numpy as np
import numba as nb

@nb.njit(fastmath=True,error_model="numpy",parallel=True)
def spin_spin(S,N):
    n= len(S)
    conf = np.reshape(S,(n**2,3))
    chi = np.zeros((N,N))
    kx = np.linspace(-5*np.pi/3,5*np.pi/3,N).astype(np.float32)
    ky = np.linspace(-3*np.pi/np.sqrt(3),3*np.pi/np.sqrt(3),N).astype(np.float32)

    x=np.reshape(triangular(n)[0],(n**2)).astype(np.float32)
    y=np.reshape(triangular(n)[1],(n**2)).astype(np.float32)

    #precalc some values
    fact=nb.float32(2/(n**2))
    conf_dot=np.dot(conf,conf.T).astype(np.float32)

    for p in nb.prange(N):
        for m in range(N):
            #accumulating on a scalar is often beneficial
            acc=nb.float32(0)
            for i in range(n**2):
                for j in range(n**2):        
                    acc+= conf_dot[i,j]*np.cos(kx[p]*(x[i]-x[j])+ ky[m]*(y[i]-y[j]))
            chi[p,m]=fact*acc

    return(chi,kx,ky)

优化函数(删除冗余计算)

做了很多重复的计算。这是一个关于如何删除它们的示例。这也是一个以双精度进行计算的版本。

@nb.njit()
def precalc(S):
    #There may not be all redundancies removed
    n= len(S)
    conf = np.reshape(S,(n**2,3))
    conf_dot=np.dot(conf,conf.T)
    x=np.reshape(triangular(n)[0],(n**2))
    y=np.reshape(triangular(n)[1],(n**2))

    x_s=set()
    y_s=set()
    for i in range(n**2):
        for j in range(n**2):
            x_s.add((x[i]-x[j]))
            y_s.add((y[i]-y[j]))

    x_arr=np.sort(np.array(list(x_s)))
    y_arr=np.sort(np.array(list(y_s)))


    conf_dot_sel=np.zeros((x_arr.shape[0],y_arr.shape[0]))
    for i in range(n**2):
        for j in range(n**2):
            ii=np.searchsorted(x_arr,x[i]-x[j])
            jj=np.searchsorted(y_arr,y[i]-y[j])
            conf_dot_sel[ii,jj]+=conf_dot[i,j]

    return x_arr,y_arr,conf_dot_sel

@nb.njit(fastmath=True,error_model="numpy",parallel=True)
def spin_spin_opt_2(S,N):
    chi = np.empty((N,N))
    n= len(S)

    kx = np.linspace(-5*np.pi/3,5*np.pi/3,N)
    ky = np.linspace(-3*np.pi/np.sqrt(3),3*np.pi/np.sqrt(3),N)

    x_arr,y_arr,conf_dot_sel=precalc(S)
    fact=2/(n**2)
    for p in nb.prange(N):
        for m in range(N):
            acc=nb.float32(0)
            for i in range(x_arr.shape[0]):
                for j in range(y_arr.shape[0]):        
                    acc+= fact*conf_dot_sel[i,j]*np.cos(kx[p]*x_arr[i]+ ky[m]*y_arr[j])
            chi[p,m]=acc

    return(chi,kx,ky)

@nb.njit()
def precalc(S):
    #There may not be all redundancies removed
    n= len(S)
    conf = np.reshape(S,(n**2,3))
    conf_dot=np.dot(conf,conf.T)
    x=np.reshape(triangular(n)[0],(n**2))
    y=np.reshape(triangular(n)[1],(n**2))

    x_s=set()
    y_s=set()
    for i in range(n**2):
        for j in range(n**2):
            x_s.add((x[i]-x[j]))
            y_s.add((y[i]-y[j]))

    x_arr=np.sort(np.array(list(x_s)))
    y_arr=np.sort(np.array(list(y_s)))


    conf_dot_sel=np.zeros((x_arr.shape[0],y_arr.shape[0]))
    for i in range(n**2):
        for j in range(n**2):
            ii=np.searchsorted(x_arr,x[i]-x[j])
            jj=np.searchsorted(y_arr,y[i]-y[j])
            conf_dot_sel[ii,jj]+=conf_dot[i,j]

    return x_arr,y_arr,conf_dot_sel

@nb.njit(fastmath=True,error_model="numpy",parallel=True)
def spin_spin_opt_2(S,N):
    chi = np.empty((N,N))
    n= len(S)

    kx = np.linspace(-5*np.pi/3,5*np.pi/3,N)
    ky = np.linspace(-3*np.pi/np.sqrt(3),3*np.pi/np.sqrt(3),N)

    x_arr,y_arr,conf_dot_sel=precalc(S)
    fact=2/(n**2)
    for p in nb.prange(N):
        for m in range(N):
            acc=nb.float32(0)
            for i in range(x_arr.shape[0]):
                for j in range(y_arr.shape[0]):        
                    acc+= fact*conf_dot_sel[i,j]*np.cos(kx[p]*x_arr[i]+ ky[m]*y_arr[j])
            chi[p,m]=acc

    return(chi,kx,ky)

计时

#brute-force
%timeit res=spin_spin(S,100)
#48 s ± 671 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

#new version
%timeit res_2=spin_spin_opt_2(S,100)
#5.33 s ± 59.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit res_2=spin_spin_opt_2(S,1000)
#1min 23s ± 2.43 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

编辑(SVML-Check)

import numba as nb
import numpy as np

@nb.njit(fastmath=True)
def foo(n):
    x   = np.empty(n*8, dtype=np.float64)
    ret = np.empty_like(x)
    for i in range(ret.size):
            ret[i] += np.cos(x[i])
    return ret

foo(1000)

if 'intel_svmlcc' in foo.inspect_llvm(foo.signatures[0]):
    print("found")
else:
    print("not found")

#found

如果有not foundReadthis link.,它应该可以在Linux和Windows上运行,但我还没有在MacOS上测试它。

相关文章