如何实现支持命名空间的 FIFO 队列

2022-01-21 00:00:00 python google-app-engine queue fifo

问题描述

我正在使用以下方法来处理基于 Google App Engine db.Model 的 FIFO 队列(查看这个问题).

I'm using the following approach to handle a FIFO queue based on Google App Engine db.Model (see this question).

from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import run_wsgi_app

class QueueItem(db.Model):
  created = db.DateTimeProperty(required=True, auto_now_add=True)
  data = db.BlobProperty(required=True)

  @staticmethod
  def push(data):
    """Add a new queue item."""
    return QueueItem(data=data).put()

  @staticmethod
  def pop():
    """Pop the oldest item off the queue."""
    def _tx_pop(candidate_key):
      # Try and grab the candidate key for ourselves. This will fail if
      # another task beat us to it.
      task = QueueItem.get(candidate_key)
      if task:
        task.delete()
      return task
    # Grab some tasks and try getting them until we find one that hasn't been
    # taken by someone else ahead of us
    while True:
      candidate_keys = QueueItem.all(keys_only=True).order('created').fetch(10)
      if not candidate_keys:
        # No tasks in queue
        return None
      for candidate_key in candidate_keys:
        task = db.run_in_transaction(_tx_pop, candidate_key)
        if task:
          return task

此队列按预期工作(非常好).

This queue works as expected (very good).

现在我的代码有一个方法可以访问延迟队列调用的这个 FIFO 队列:

Right now my code has a method that access this FIFO queue invoked by a deferred queue:

def deferred_worker():
        data= QueueItem.pop()
        do_something_with(data)

我想增强此方法和队列数据结构,添加一个 client_ID 参数,表示需要访问自己的队列的特定客户端.比如:

I would like to enhance this method and the queue data structure adding a client_ID parameter representing a specific client that needs to access its own Queue. Something like:

def deferred_worker(client_ID):
        data= QueueItem_of_this_client_ID.pop() # I need to implement this
        do_something_with(data)

如何将队列编码为可识别 client_ID?

How could I code the Queue to be client_ID aware?

约束:
- 客户端数量是动态的,不是预定义的
- 任务队列不是一个选项(1. 最多十个队列 2. 我想完全控制我的队列)

Constraints:
- The number of clients is dynamic and not predefined
- Taskqueue is not an option (1. ten max queues 2. I would like to have full control on my queue)

您知道如何使用新的 Namespaces api(记住我不是从 webapp.RequestHandler 调用 db.Model)?
另一种选择:我可以将 client_ID db.StringProperty 添加到 QueueItem 中,使用它在 pull 方法上有一个过滤器:

Do you know how could I add this behaviour using the new Namespaces api (Remember that I'm not calling the db.Model from a webapp.RequestHandler)?
Another option: I could add a client_ID db.StringProperty to the QueueItem using it has a filter on pull method:

QueueItem.all(keys_only=True).filter(client_ID=an_ID).order('created').fetch(10)

有更好的主意吗?


解决方案

假设您的客户端类"实际上是客户端调用的请求处理程序,您可以执行以下操作:

Assuming your "client class" is really a request handler the client calls, you could do something like this:

from google.appengine.api import users
from google.appengine.api.namespace_manager import set_namespace

class ClientClass(webapp.RequestHandler):
  def get(self):
    # For this example let's assume the user_id is your unique id.
    # You could just as easily use a parameter you are passed.
    user = users.get_current_user()
    if user:
       # If there is a user, use their queue.  Otherwise the global queue.
       set_namespace(user.user_id())

    item = QueueItem.pop()
    self.response.out.write(str(item))

    QueueItem.push('The next task.')

或者,您也可以设置命名空间应用范围.

Alternatively, you could also set the namespace app-wide.

通过设置默认命名空间,对数据存储的所有调用都将在该命名空间内",除非您明确指定.请注意,要获取和运行任务,您必须知道命名空间.因此,您可能希望在默认命名空间中维护一个命名空间列表以进行清理.

By setting the default namespace all calls to the datastore will be "within" that namespace, unless you explicitly specify otherwise. Just note, to fetch and run tasks you'll have to know the namespace. So you probably want to maintain a list of namespaces in the default namespace for cleanup purposes.

相关文章