如何使用插入/修改/删除将 dynamodb 设计为弹性搜索

问题描述

如何使用 Python 将整个文档传递到弹性搜索中?这是放入弹性搜索的正确方法吗?

How to pass this entire document into elastic search using Python? Is this this the right way to put into elastic search?

在dynamodb中id是主键

In dynamodb id is the primary key

如何插入dynamodb 下面是代码

How to insert in to dynamodb Below is the code

import boto3
from boto3.dynamodb.conditions import Key, And, Attr
def lambda_handler(event, context):
    dynamodb = boto3.resource ('dynamodb')
    table =dynamodb.Table('newtable')
    with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
            batch.put_item(
                Item={
                    'id': '1',
                    'last_name': 'V',
                    'age': '2',
                }
            )
            batch.put_item(
                Item={
                    'id': '2',
                    'last_name': 'JJ',
                    'age': '7',
                }
            )
            batch.put_item(
                Item={
                    'id': '9',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )
            batch.put_item(
                Item={
                    'id': '10',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )

  • 如何将期望推送到 Elastic Search 中

    • How to push expected out into Elastic Search

      dynamodb 内容发生变化时如何在 ES 中自动反映

      How to reflect automatically in ES if dynamodb content changes

      我已经浏览了链接 https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-with-amazon-elasticsearch-service-using-aws-lambda/

      下面是我收到错误的代码 ERROR: NameError("name 'event' is not defined")

      Below is code I am getting error ERROR: NameError("name 'event' is not defined")

      代码.* 在此之前从 dynamodb 表中触发以下 lambda 函数

      Code. * Before that trigger the below lambda function from the dynamodb table

      import boto3
      import json
      import re
      from requests_aws4auth import AWS4Auth
      from elasticsearch import Elasticsearch, RequestsHttpConnection
      
      session = boto3.session.Session()
      credentials = session.get_credentials()
      # s3 = session.resource('s3')
      awsauth = AWS4Auth(credentials.access_key,
                         credentials.secret_key,
                         session.region_name, 'es',
                         session_token=credentials.token)
      es = Elasticsearch(
          ['https://xx-east-1.es.amazonaws.com'],
          http_auth=awsauth,
          use_ssl=True,
          verify_certs=True,
          connection_class=RequestsHttpConnection
      )
      reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                         "_timestamp", "_ttl"]
      
      
      def lambda_handler(event, context):
          print(event)
          dynamodb = boto3.resource('dynamodb')
      
          # Loop over the DynamoDB Stream records
          for record in event['Records']:
      
              try:
      
                  if record['eventName'] == "INSERT":
                      insert_document(es, record)
                  elif record['eventName'] == "REMOVE":
                      remove_document(es, record)
                  elif record['eventName'] == "MODIFY":
                      modify_document(es, record)
      
              except Exception as e:
                  print("Failed to process:")
                  print(json.dumps(record))
                  print("ERROR: " + repr(e))
                  continue
      
      
      # Process MODIFY events
      def modify_document(es, record):
          table = getTable(record)
          print("Dynamo Table: " + table)
      
          docId = docid(record)
          print("KEY")
          print(docId)
      
          # Unmarshal the DynamoDB JSON to a normal JSON
          doc = json.dumps(document())
      
          print("Updated document:")
          print(doc)
      
          # We reindex the whole document as ES accepts partial docs
          es.index(index=table,
                   body=doc,
                   id=docId,
                   doc_type=table,
                   refresh=True)
      
          print("Successly modified - Index: " + table + " - Document ID: " + docId)
      
      
      def remove_document(es, record):
          table = getTable(record)
          print("Dynamo Table: " + table)
      
          docId = docid(record)
          print("Deleting document ID: " + docId)
      
          es.delete(index=table,
                    id=docId,
                    doc_type=table,
                    refresh=True)
      
          print("Successly removed - Index: " + table + " - Document ID: " + docId)
      
      
      # Process INSERT events
      def insert_document(es, record):
          table = getTable(record)
          print("Dynamo Table: " + table)
      
          # Create index if missing
          if es.indices.exists(table) == False:
              print("Create missing index: " + table)
      
              es.indices.create(table,
                                body='{"settings": { "index.mapping.coerce": true } }')
      
              print("Index created: " + table)
      
          # Unmarshal the DynamoDB JSON to a normal JSON
          doc = json.dumps(document())
      
          print("New document to Index:")
          print(doc)
      
          newId = docid(record)
          es.index(index=table,
                   body=doc,
                   id=newId,
                   doc_type=table,
                   refresh=True)
      
          print("Successly inserted - Index: " + table + " - Document ID: " + newId)
      
      
      def getTable(record):
          p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
          m = p.match(record['eventSourceARN'])
          if m is None:
              raise Exception("Table not found in SourceARN")
          return m.group(1).lower()
      
      
      def document(event):
          result = []
          for r in event['Records']:
              tmp = {}
              for k, v in r['dynamodb']['NewImage'].items():
                  if "S" in v.keys() or "BOOL" in v.keys():
                      tmp[k] = v.get('S', v.get('BOOL', False))
                  elif 'NULL' in v:
                      tmp[k] = None
              result.append(tmp)
              for i in result:
                  return i
      
      
      def docid(event):
          result = []
          for r in event['Records']:
              tmp = {}
              for k, v in r['dynamodb']['Keys'].items():
                  if "S" in v.keys() or "BOOL" in v.keys():
                      tmp[k] = v.get('S', v.get('BOOL', False))
                  elif 'NULL' in v:
                      tmp[k] = None
              result.append(tmp)
          for newId in result:
              return newId
      

      在文档和文档中出现错误

      Getting error at document and docid

      各自都有输出

      result = []
      for r in event['Records']:
          tmp = {}
      
          for k, v in r['dynamodb']['NewImage'].items():
          #for k, v in r['dynamodb']['Keys'].items():
              if "S" in v.keys() or "BOOL" in v.keys():
                  tmp[k] = v.get('S', v.get('BOOL', False))
              elif 'NULL' in v:
                  tmp[k] = None
      
          result.append(tmp)
      for i in result:
          print (i)
      
      event = {'Records': [{'eventID': '2339bc590c21035b84f8cc602b12c1d2', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '9'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '9'}, 'age': {'S': '95'}}, 'SequenceNumber': '3100000000035684810908', 'SizeBytes': 23, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'xxxx', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '2'}}, 'NewImage': {'last_name': {'S': 'JJ'}, 'id': {'S': '2'}, 'age': {'S': '5'}}, 'SequenceNumber': '3200000000035684810954', 'SizeBytes': 21, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'a9c90c0c4a5a4b64d0314c4557e94e28', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '10'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '10'}, 'age': {'S': '95'}}, 'SequenceNumber': '3300000000035684810956', 'SizeBytes': 25, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': '288f4a424992e5917af0350b53f754dc', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '1'}}, 'NewImage': {'last_name': {'S': 'V'}, 'id': {'S': '1'}, 'age': {'S': '2'}}, 'SequenceNumber': '3400000000035684810957', 'SizeBytes': 20, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}]}
      


      解决方案

      您可以检查以下内容.我尝试复制该问题并且可以确认错误

      You can check the following. I tried to replicate the issue and can confirm the error of

      ERROR: NameError("name 'event' is not defined")
      

      我使用了 simulated INSERT event 来自 DynamoDb 流,基于您的示例和 我自己的表:

      I used simulated INSERT event from DynamoDb stream, based on your example and my own table:

      {
        "Records": [
          {
            "eventID": "b8b993cf16d1aacb61b40411b39e0b1f",
            "eventName": "INSERT",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "us-east-1",
            "dynamodb": {
              "ApproximateCreationDateTime": 1595922821.0,
              "Keys": {
                "id": {
                  "N": "1"
                }
              },
              "NewImage": {
                "last_name": {
                  "S": "V"
                },
                "id": {
                  "N": "1"
                },
                "age": {
                  "S": "2"
                }
              },
              "SequenceNumber": "25200000000020406897812",
              "SizeBytes": 22,
              "StreamViewType": "NEW_AND_OLD_IMAGES"
            },
            "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
          },
          {
            "eventID": "e5d5bec988945c06ffc879cf16b89bf7",
            "eventName": "INSERT",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "us-east-1",
            "dynamodb": {
              "ApproximateCreationDateTime": 1595922821.0,
              "Keys": {
                "id": {
                  "N": "9"
                }
              },
              "NewImage": {
                "last_name": {
                  "S": "ADD"
                },
                "id": {
                  "N": "9"
                },
                "age": {
                  "S": "95"
                }
              },
              "SequenceNumber": "25300000000020406897813",
              "SizeBytes": 25,
              "StreamViewType": "NEW_AND_OLD_IMAGES"
            },
            "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
          },
          {
            "eventID": "f1a7c9736253b5ef28ced38ed5ff645b",
            "eventName": "INSERT",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "us-east-1",
            "dynamodb": {
              "ApproximateCreationDateTime": 1595922821.0,
              "Keys": {
                "id": {
                  "N": "2"
                }
              },
              "NewImage": {
                "last_name": {
                  "S": "JJ"
                },
                "id": {
                  "N": "2"
                },
                "age": {
                  "S": "7"
                }
              },
              "SequenceNumber": "25400000000020406897819",
              "SizeBytes": 23,
              "StreamViewType": "NEW_AND_OLD_IMAGES"
            },
            "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
          },
          {
            "eventID": "bfcbad9dc19883e4172e6dc25e66637b",
            "eventName": "INSERT",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "us-east-1",
            "dynamodb": {
              "ApproximateCreationDateTime": 1595922821.0,
              "Keys": {
                "id": {
                  "N": "10"
                }
              },
              "NewImage": {
                "last_name": {
                  "S": "ADD"
                },
                "id": {
                  "N": "10"
                },
                "age": {
                  "S": "95"
                }
              },
              "SequenceNumber": "25500000000020406897820",
              "SizeBytes": 25,
              "StreamViewType": "NEW_AND_OLD_IMAGES"
            },
            "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
          }
        ]
      }
      

      示例修改事件:

      {
        "Records": [
          {
            "eventID": "4e4629c88aa00e366c89a293d9c82d54",
            "eventName": "MODIFY",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "us-east-1",
            "dynamodb": {
              "ApproximateCreationDateTime": 1595924589.0,
              "Keys": {
                "id": {
                  "N": "2"
                }
              },
              "NewImage": {
                "last_name": {
                  "S": "zhgdhfgdh"
                },
                "id": {
                  "N": "2"
                },
                "age": {
                  "S": "7"
                }
              },
              "OldImage": {
                "last_name": {
                  "S": "JJ"
                },
                "id": {
                  "N": "2"
                },
                "age": {
                  "S": "7"
                }
              },
              "SequenceNumber": "25600000000020408264140",
              "SizeBytes": 49,
              "StreamViewType": "NEW_AND_OLD_IMAGES"
            },
            "eventSourceARN": "arn:aws:dynamodb:us-east-1:34234:table/newtable/stream/2020-07-28T06:59:38.569"
          }
        ]
      }
      

      lambda函数的修改代码,我现在可以确认不会产生错误:

      Modified code of lambda function, which I can confirm does not produce errors now:

      import boto3
      import json
      import re
      
      from requests_aws4auth import AWS4Auth
      from elasticsearch import Elasticsearch, RequestsHttpConnection
      
      session = boto3.session.Session()
      credentials = session.get_credentials()
      
      s3 = session.resource('s3')
      
      awsauth = AWS4Auth(credentials.access_key,
                        credentials.secret_key,
                        session.region_name, 'es',
                        session_token=credentials.token)
      
          
      es = Elasticsearch(
          ['https://vpc-test-dmamain-452frn764ggb4a.us-east-1.es.amazonaws.com'],
          use_ssl=True,
          verify_certs=True,
          http_auth=awsauth,
          connection_class=RequestsHttpConnection
      )
      reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                         "_timestamp", "_ttl"]
      
      
      def lambda_handler(event, context):
          print(event)
          #dynamodb = boto3.resource('dynamodb')
      
          # Loop over the DynamoDB Stream records
          for record in event['Records']:
                  
              if record['eventName'] == "INSERT":
                  insert_document(event, es, record)
              elif record['eventName'] == "REMOVE":
                  remove_document(event, es, record)
              elif record['eventName'] == "MODIFY":
                  modify_document(event, es, record)
      
      
      # Process MODIFY events
      def modify_document(event, es, record):
          table = getTable(record)
          print("Dynamo Table: " + table)
      
          docId = docid(event, event)
          print("KEY")
          print(docId)
      
          # Unmarshal the DynamoDB JSON to a normal JSON
          doc = json.dumps(document(event))
      
          print("Updated document:")
          print(doc)
      
          # We reindex the whole document as ES accepts partial docs
          es.index(index=table,
                   body=doc,
                   id=docId,
                   doc_type=table,
                   refresh=True)
      
          print("Successly modified - Index: " , table , " - Document ID: " , docId)
      
      
      def remove_document(event, es, record):
          
          table = getTable(record)
          
          print("Dynamo Table: " + table)
      
          docId = docid(event, event)
          print("Deleting document ID: ", docId)
      
          es.delete(index=table,
                    id=docId,
                    doc_type=table,
                    refresh=True)
      
          print("Successly removed - Index: ", table, " - Document ID: " , docId)
      
      
      # Process INSERT events
      def insert_document(event, es, record):
          table = getTable(record)
          print("Dynamo Table: " + table)
      
          # Create index if missing
          if es.indices.exists(table) == False:
              print("Create missing index: " + table)
      
              es.indices.create(table,
                                body='{"settings": { "index.mapping.coerce": true } }')
      
              print("Index created: " + table)
      
          # Unmarshal the DynamoDB JSON to a normal JSON
          doc = json.dumps(document(event))
      
          print("New document to Index:")
          print(doc)
      
          newId = docid(event, record)
          
          es.index(index=table,
                   body=doc,
                   id=newId,
                   doc_type=table,
                   refresh=True)
      
          print("Successly inserted - Index: " , table + " - Document ID: " , newId)
      
      
      def getTable(record):
          p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
          m = p.match(record['eventSourceARN'])
          if m is None:
              raise Exception("Table not found in SourceARN")
          return m.group(1).lower()
      
      
      def document(event):
          result = []
          for r in event['Records']:
              tmp = {}
              for k, v in r['dynamodb']['NewImage'].items():
                  if "S" in v.keys() or "BOOL" in v.keys():
                      tmp[k] = v.get('S', v.get('BOOL', False))
                  elif 'NULL' in v:
                      tmp[k] = None
              result.append(tmp)
              for i in result:
                  return i
      
      
      def docid(event, record):
          result = []
          for r in event['Records']:
              tmp = {}
              for k, v in r['dynamodb']['Keys'].items():
                  if "S" in v.keys() or "BOOL" in v.keys():
                      tmp[k] = v.get('S', v.get('BOOL', False))
                  elif 'NULL' in v:
                      tmp[k] = None
              result.append(tmp)
          for newId in result:
              return newId
      

      我尚未验证数据是否正确写入、修改或插入 ElasticSearch.但是我运行了 ES 域,并在 lambda 中使用它来验证 lambda 是否可以连接到它并运行查询.

      I haven't verified if data is correctly written, modified or inserted into ElasticSearch. But I had ES domain running and used in the lambda to verify if lambda can connect to it and run the queries.

      插入事件的 lambda 输出示例:

      Example output from lambda for INSERT event:

      Dynamo Table: newtable
      New document to Index:
      {"last_name": "V", "age": "2"}
      Successly inserted - Index:  newtable - Document ID:  {}
      Dynamo Table: newtable
      New document to Index:
      {"last_name": "V", "age": "2"}
      Successly inserted - Index:  newtable - Document ID:  {}
      Dynamo Table: newtable
      New document to Index:
      {"last_name": "V", "age": "2"}
      Successly inserted - Index:  newtable - Document ID:  {}
      Dynamo Table: newtable
      New document to Index:
      {"last_name": "V", "age": "2"}
      Successly inserted - Index:  newtable - Document ID:  {}
      
      Example output from lambda from MODIFY event:
      
      

      更新文档:

      {
          "last_name": "zhgdhfgdh",
          "age": "7"
      }
      Successly modified - Index:  newtable  - Document ID:  
      {}
      

      我认为 docid 是否正常工作需要进一步调查,因为它似乎返回空字典:

      I think docid requires further investigation if it works correctly as it seems to return empty dict:

       Document ID:  {}
      

相关文章