使用 API 调用时适用油门 pandas
问题描述
我有一个带有地址列的大型 DataFrame:
I have a large DataFrame with an address column:
data addr
0 0.617964 IN,Krishnagiri,635115
1 0.635428 IN,Chennai,600005
2 0.630125 IN,Karnal,132001
3 0.981282 IN,Jaipur,302021
4 0.715813 IN,Chennai,600005
...
我写了下面的函数,用地址的经纬度坐标替换地址:
and I've written the following function to replace the address with the longitude and latitude coordinates of the address:
from geopy.geocoders import Nominatim
geo_locator = Nominatim(user_agent="MY_APP_ID")
def get_coordinates(addr):
location = geo_locator.geocode(addr)
if location is not None:
return pd.Series({'lat': location.latitude, 'lon': location.longitude})
location = geo_locator.geocode(addr.split(',')[0])
if location is not None:
return pd.Series({'lat': location.latitude, 'lon': location.longitude})
return pd.Series({'lat': -1, 'lon': -1})
然后在地址列上调用pandas apply方法,并将结果连接到DF的末尾而不是地址列:
Then calling pandas apply method on the address column, and concatinating the result to the end of the DF instead of the address column:
df = pd.concat([df, df.addr.apply(get_coordinates)], axis=1).drop(['addr'], axis=1)
但是,由于 get_coordinates 调用了第 3 方 API,因此失败:geopy.exc.GeocoderTimedOut: Service timed out
However, since the get_coordinates calls a 3rd party API it fails on: geopy.exc.GeocoderTimedOut: Service timed out
如何限制请求以确保在继续下一个值之前得到响应?
How do I throttle the requests to make sure I got a response before continuing to the next value?
更新:
为了进一步改进,我想仅在唯一值上调用 API,即:如果地址 IN,Krishnagiri,635115
在我的 DataFrame 中出现 20 次,我只想调用一次并应用所有 20 次出现的结果.
Update:
For further improvements, I would like to call the API only on unique values, i.e: if the address IN,Krishnagiri,635115
appears 20 times in my DataFrame, I would like to call it only once and apply the results to all 20 occurrences.
更新 2:
日志 + 堆栈跟踪,用于@Andrew Lavers 代码:
Update 2:
Log + Stack trace, for @Andrew Lavers code:
...
Fetched Gandipet, Khanapur, Rangareddy District, Telangana, 500075, India
Fetched Jaipur Municipal Corporation, Jaipur, Rajasthan, 302015, India
Fetched Chennai, Chennai district, Tamil Nadu, India
Exception from geolocator: Fake exception for testing
Backing off for 1 seconds.
Exception from geolocator: Fake exception for testing
Backing off for 3 seconds.
Fetched None
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/geopy/geocoders/base.py", line 344, in _call_geocoder
page = requester(req, timeout=timeout, **kwargs)
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 526, in open
response = self._open(req, data)
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 544, in _open
'_open', req)
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 504, in _call_chain
result = func(*args)
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 1361, in https_open
context=self._context, check_hostname=self._check_hostname)
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 1321, in do_open
r = h.getresponse()
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1331, in getresponse
response.begin()
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 297, in begin
version, status, reason = self._read_status()
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 258, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/socket.py", line 586, in readinto
return self._sock.recv_into(b)
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/ssl.py", line 1002, in recv_into
return self.read(nbytes, buffer)
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/ssl.py", line 865, in read
return self._sslobj.read(len, buffer)
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/ssl.py", line 625, in read
v = self._sslobj.read(len, buffer)
socket.timeout: The read operation timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/...//tmp.py", line 89, in <module>
df.addr.apply(get_coordinates)
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pandas/core/series.py", line 3194, in apply
mapped = lib.map_infer(values, f, convert=convert_dtype)
File "pandas/_libs/src/inference.pyx", line 1472, in pandas._libs.lib.map_infer
File "/Users/...//tmp.py", line 76, in get_coordinates
location = geo_locator.geocode(addr.split(',')[0])
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/geopy/geocoders/osm.py", line 307, in geocode
self._call_geocoder(url, timeout=timeout), exactly_one
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/geopy/geocoders/base.py", line 371, in _call_geocoder
raise GeocoderTimedOut('Service timed out')
geopy.exc.GeocoderTimedOut: Service timed out
Process finished with exit code 1
解决方案
这里有一些测试代码可能会有所帮助.1) 对 Api 指定的简单速率限制(Nominatum 似乎是每秒 1 次,但我的成功率低至 0.1 秒).2) 字典中的简单结果缓存,可通过测试参数控制 3) 具有乘法减速和线性加速的重试循环.(减速快,加速更慢) 4)伪造错误的测试异常
Here is some tested code that may help. 1) Simple rate limiting to what the Api specifies (Nominatum appears to be 1 per second but i got success as low as 0.1 seconds). 2) Simple result caching in a dictionary, controllable by parameter for testing 3) Retry loop with multiplicative slowdown and linear speedup. (slows down fast, speeds up more slowly) 4) Test exception for faking errors
我无法复制您遇到的问题 - 可能是由于您使用 API 的路径.
I cannot replicate the issues you are experiencing - likely due to your path to the API.
更健壮的策略可能是构建本地持久性缓存并继续重试,直到构建完整的批次.缓存可以是一个以 csv 格式写入文件的 pandas 数据帧.整个伪代码类似于.
A more robust strategy may to build a local persistence cache and continue to retry until the full batch is built. The cache could be a pandas dataframe written as csv to file. The overall pseudo code is something like.
repeat until all addresses are in the cache
cache = pd.read_csv("cache.csv)
addressess_to_get = addresses in df that are not in cache
for batch of n addresses in addresses_to_get:
cache.add(get_location(addr))
cache.write_csv("cache.csv")
这是测试代码
import datetime
import time
import pandas as pd
from geopy.geocoders import Nominatim
geo_locator = Nominatim(user_agent="notarealemail@gmail.com")
# Define the rate limit function and associated global variable
last_time = datetime.datetime.now()
backoff_time = 0
def rate_limit(min_interval_seconds = .1):
global last_time
sleep = min_interval_seconds - (datetime.datetime.now() - last_time).total_seconds()
if sleep > 0 :
print(f'Sleeping for {sleep} seconds')
time.sleep(sleep)
last_time = datetime.datetime.now()
# make a cache dictionary keyed by address
geo_cache = {}
backoff_seconds = 0
def get_coordinates_with_retry(addr):
# Return coords from global cache if it exists
global backoff_seconds
# set the backoff intital values and factors
max_backoff_seconds = 60
backoff_exponential = 2
backoff_linear = 2
# rate limited API call
rate_limit()
# Retry until max_back_seconds is reached
while backoff_seconds < max_backoff_seconds: # backoff up to this time
if backoff_seconds > 0:
print(f"Backing off for {backoff_seconds} seconds.")
time.sleep(backoff_seconds)
try:
location = geo_locator.geocode(addr)
# REMOVE THIS: fake an error for testing
#import random
#if random.random() < .3:
# raise(Exception("Fake exception for testing"))
# Success - so reduce the backoff linearly
print (f"Fetched {location} for address {addr}")
backoff_seconds = backoff_seconds - backoff_linear if backoff_seconds > backoff_linear else 0
break
except Exception as e:
print(f"Exception from geolocator: {e}")
# Backoff exponentially
backoff_seconds = 1 + backoff_seconds * backoff_exponential
if backoff_seconds > max_backoff_seconds:
raise Exception("Max backoff reached
")
return(location)
def get_coordinates(addr, useCache = True):
# Return from cache if previously loaded
global geo_cache
if addr in geo_cache:
return geo_cache[addr]
# Attempt using the full address
location = get_coordinates_with_retry(addr)
# Attempt using the first part only if None found
if location is not None:
result = pd.Series({'lat': location.latitude, 'lon': location.longitude})
else :
print (f"Trying split address for address {addr}")
location = get_coordinates_with_retry(addr.split(',')[0])
if location is not None:
result = pd.Series({'lat': location.latitude, 'lon': location.longitude})
else:
result = pd.Series({'lat': -1, 'lon': -1})
# assign to cache
if useCache:
geo_cache[addr] = result
return(result)
# Use the test data
df = pd.DataFrame({'addr' : [
'IN,Krishnagiri,635115',
'IN,Chennai,600005',
'IN,Karnal,132001',
'IN,Jaipur,302021',
'IN,Chennai,600005']})
# repeat the test data to make alarger set
df = pd.concat([df, df, df, df, df, df, df, df, df, df])
df.addr.apply(get_coordinates)
print(f"Address cache contains {len(geo_cache)} address locations.")
相关文章