使用 INSERT INTO table ON DUPLICATE KEY 时出错,使用 for 循环数组

我正在使用 pyspark 框架更新 mysql 数据库,并在 AWS Glue 服务上运行.

I am working on updating a mysql database using pyspark framework, and running on AWS Glue services.


I have a dataframe as follows:

df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103","81A01","TERR NAME 01","NJ")], ["zip_code","territory_code","territory_name","state"])

# Print out information about this data
|    xxx1|         81A01|  TERR NAME 55|   NY|
|    xxx2|         81A01|  TERR NAME 55|   NY|
|    x103|         81A01|  TERR NAME 01|   NJ|

我有一个主键 ZIP_CODE,我需要确保没有重复键或主键异常,因此我使用 INSERT INTO .... ON DUPLICATE KEYS.

I have a primary key ZIP_CODE, and I need to ensure, there is no duplicate keys, or primary key exceptions, and hence am using INSERT INTO .... ON DUPLICATE KEYS.

而且由于我有不止一行要插入/更新,所以我在 python 中使用了数组来循环记录,并对数据库执行 INSERT.代码如下:

And since I have more than one rows to insert/update, I have used for array in python to loop through the records, and perform INSERT into database. The code is as follows:

sarry = df2.collect()
for r in sarry:
     db = MySQLdb.connect("xxxx.rds.amazonaws.com", "username", "password", 
     cursor = db.cursor()
     insertQry=INSERT INTO ZIP_TERR(zip_code, territory_code, territory_name, 
     state) VALUES(r.zip_code, r.territory_code, r.territory_name, r.state) ON 
     DUPLICATE KEY UPDATE territory_name = VALUES(territory_name), state = 


When running the above insert query function, I am getting the following error message, couldn't get any clue on the error. Please help.

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-2291407229037300959.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-2291407229037300959.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 8, in <module>
  File "/usr/local/lib/python2.7/site-packages/pymysql/cursors.py", line 170, in execute
    result = self._query(query)
  File "/usr/local/lib/python2.7/site-packages/pymysql/cursors.py", line 328, in _query
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 893, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 1103, in _read_query_result
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 1396, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 1059, in _read_packet
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 384, in check_error
  File "/usr/local/lib/python2.7/site-packages/pymysql/err.py", line 109, in raise_mysql_exception
    raise errorclass(errno, errval)
InternalError: (1054, u"Unknown column 'r.zip_code' in 'field list'")


If i simply try to print the values for one row, am getting the values printed as follows:

print('zip_code_new: ', r.zip_code, r.territory_code, r.territory_name, r.state)

zip_code_new:  xxx1 81A01 TERR NAME 55 NY

谢谢.我正在研究 AWS Glue/Pyspark,所以我需要使用原生 Python 库.

Thanks. I am working on AWS Glue/Pyspark, so I need to use native python libraries.


以下插入查询有效,带有 for 循环.

The following insert query works, with a for loop.

insertQry="INSERT INTO ZIP_TERR(zip_code, territory_code, territory_name, state) VALUES(%s, %s, %s, %s) ON DUPLICATE KEY UPDATE territory_name = %s, state = %s;

n=cursor.execute(insertQry, (r.zip_code, r.territory_code, r.territory_name, r.state, r.territory_name, r.state))
print (" CURSOR status :", n)


CURSOR status : 2


Thanks. Hope this will be of reference to others.
