如何在 RBDMS 或 NOSQL 数据存储或其他消息系统(例如,rabbitmq)之上实现分布式队列类事物?

2022-01-11 00:00:00 python nosql rabbitmq message-queue java

从如果不是很酷"的问题类别...

From the wouldn't-it-be-cool-if category of questions ...

我所说的类似队列的东西"是指支持以下操作:

By "queue-like-thing" I mean supports the following operations:

  • append(entry:Entry) - 将条目添加到队列尾部
  • take(): Entry - 从队列头部移除条目并返回
  • promote(entry_id) - 将条目移动到靠近头部的位置;当前占据该位置的条目被移动到旧位置
  • demote(entry_id) - 与promote(entry_id)相反

可选操作类似于:

  • promote(entry_id, amount) - 与promote(entry_id) 类似,但您指定职位数量
  • demote(entry_id, amount) - 与promote(entry_id, amount)相反
  • 当然,如果我们允许数量为正数或负数,我们可以将提升/降级方法与单个 move(entry_id, amount) 方法合并

如果可以在队列上以分布式方式执行以下操作(多个客户端与队列交互),那将是理想的:

It would be ideal if the following operations could be performed on the queue in a distributed fashion (multiple clients interacting with the queue):

queue = ...

queue.append( a )
queue.append( b )
queue.append( c )

print queue
"a b c"

queue.promote( b.id )
print queue
"b a c"

queue.demote( a.id )
"b c a"

x = queue.take()
print x
"b"
print queue
"c a"

是否有任何数据存储特别适合此用例?即使多个用户同时修改队列,队列也应始终处于一致状态.

Are there any data stores that are particularly apt for this use case? The queue should always be in a consistent state even if multiple users are modifying the queue simultaneously.

如果没有升级/降级/移动的要求,不会有太大问题.

If it weren't for the promote/demote/move requirement, there wouldn't be much of a problem.

如果有 Java 和/或 Python 库来完成上述任务,则可以加分.

Bonus points if there are Java and/or Python libraries to accomplish the task outlined above.

解决方案的扩展性应该非常好.

Solution should scale extremely well.

推荐答案

Python: "Batteries Included"

我认为 Python 和一些库足以解决这个问题,而不是像 RabbitMQ、Redis 或 RDBMS 这样的数据存储.有些人可能会抱怨这种自己动手的方法是在重新发明轮子,但我更喜欢运行 100 行 Python 代码而不是管理另一个数据存储.

Python: "Batteries Included"

Rather than looking to a data store like RabbitMQ, Redis, or an RDBMS, I think python and a couple libraries have more than enough to solve this problem. Some may complain that this do-it-yourself approach is re-inventing the wheel but I prefer running a hundred lines of python code over managing another data store.

您定义的操作:追加、获取、提升和降级,描述了一个优先级队列.不幸的是,python 没有内置的优先级队列数据类型.但它确实有一个名为 heapq 的堆库,优先级队列通常以堆的形式实现.这是我实现的满足您要求的优先级队列:

The operations that you define: append, take, promote, and demote, describe a priority queue. Unfortunately python doesn't have a built-in priority queue data type. But it does have a heap library called heapq and priority queues are often implemented as heaps. Here's my implementation of a priority queue meeting your requirements:

class PQueue:
    """
    Implements a priority queue with append, take, promote, and demote
    operations.
    """
    def __init__(self):
        """
        Initialize empty priority queue.
        self.toll is max(priority) and max(rowid) in the queue
        self.heap is the heap maintained for take command
        self.rows is a mapping from rowid to items
        self.pris is a mapping from priority to items
        """
        self.toll = 0
        self.heap = list()
        self.rows = dict()
        self.pris = dict()

    def append(self, value):
        """
        Append value to our priority queue.
        The new value is added with lowest priority as an item. Items are
        threeple lists consisting of [priority, rowid, value]. The rowid
        is used by the promote/demote commands.
        Returns the new rowid corresponding to the new item.
        """
        self.toll += 1
        item = [self.toll, self.toll, value]
        self.heap.append(item)
        self.rows[self.toll] = item
        self.pris[self.toll] = item
        return self.toll

    def take(self):
        """
        Take the highest priority item out of the queue.
        Returns the value of the item.
        """
        item = heapq.heappop(self.heap)
        del self.pris[item[0]]
        del self.rows[item[1]]
        return item[2]

    def promote(self, rowid):
        """
        Promote an item in the queue.
        The promoted item swaps position with the next highest item.
        Returns the number of affected rows.
        """
        if rowid not in self.rows: return 0
        item = self.rows[rowid]
        item_pri, item_row, item_val = item
        next = item_pri - 1
        if next in self.pris:
            iota = self.pris[next]
            iota_pri, iota_row, iota_val = iota
            iota[1], iota[2] = item_row, item_val
            item[1], item[2] = iota_row, iota_val
            self.rows[item_row] = iota
            self.rows[iota_row] = item
            return 2
        return 0

降级命令与提升命令几乎相同,因此为简洁起见,我将省略它.请注意,这仅取决于 python 的列表、字典和 heapq 库.

The demote command is nearly identical to the promote command so I'll omit it for brevity. Note that this depends only on python's lists, dicts, and heapq library.

现在使用 PQueue 数据类型,我们希望允许与实例进行分布式交互.gevent 是一个很好的库.尽管 gevent 相对较新且仍处于测试阶段,但它的速度非常快且经过良好测试.使用 gevent,我们可以很容易地设置一个监听 localhost:4040 的套接字服务器.这是我的服务器代码:

Now with the PQueue data type, we'd like to allow distributed interactions with an instance. A great library for this is gevent. Though gevent is relatively new and still beta, it's wonderfully fast and well tested. With gevent, we can setup a socket server listening on localhost:4040 pretty easily. Here's my server code:

pqueue = PQueue()

def pqueue_server(sock, addr):
    text = sock.recv(1024)
    cmds = text.split(' ')
    if cmds[0] == 'append':
        result = pqueue.append(cmds[1])
    elif cmds[0] == 'take':
        result = pqueue.take()
    elif cmds[0] == 'promote':
        result = pqueue.promote(int(cmds[1]))
    elif cmds[0] == 'demote':
        result = pqueue.demote(int(cmds[1]))
    else:
        result = ''
    sock.sendall(str(result))
    print 'Request:', text, '; Response:', str(result)

if args.listen:
    server = StreamServer(('127.0.0.1', 4040), pqueue_server)
    print 'Starting pqueue server on port 4040...'
    server.serve_forever()

在生产中运行之前,您当然希望做一些更好的错误/缓冲区处理.但它适用于快速原型制作.请注意,这不需要围绕 pqueue 对象进行任何锁定.Gevent 实际上并没有并行运行代码,它只是给人一种印象.缺点是更多的内核无济于事,但好处是无锁代码.

Before that runs in production, you'll of course want to do some better error/buffer handling. But it'll work just fine for rapid-prototyping. Notice that this doesn't require any locking around the pqueue object. Gevent doesn't actually run code in parallel, it just gives that impression. The drawback is that more cores won't help but the benefit is lock-free code.

不要误会,gevent SocketServer 会同时处理多个请求.但它通过协作多任务处理在响应请求之间切换.这意味着您必须让出协程的时间片.虽然 gevents 套接字 I/O 函数旨在让出,但我们的 pqueue 实现却不是.幸运的是,pqueue 非常快地完成了它的任务.

Don't get me wrong, the gevent SocketServer will process multiple requests at the same time. But it switches between answering requests through cooperative multitasking. This means you have to yield the coroutine's time slice. While gevents socket I/O functions are designed to yield, our pqueue implementation is not. Fortunately, the pqueue completes it's tasks really quickly.

在进行原型设计时,我发现拥有一个客户也很有用.编写客户端需要一些谷歌搜索,所以我也会分享该代码:

While prototyping, I found it useful to have a client as well. It took some googling to write a client so I'll share that code too:

if args.client:
    while True:
        msg = raw_input('> ')
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(msg)
        text = sock.recv(1024)
        sock.close()
        print text

要使用新的数据存储,首先启动服务器,然后启动客户端.在客户提示下,您应该能够做到:

To use the new data store, first start the server and then start the client. At the client prompt you ought to be able to do:

> append one
1
> append two
2
> append three
3
> promote 2
2
> promote 2
0
> take
two

扩展性非常好

考虑到您对数据存储的看法,您似乎真的很关心吞吐量和持久性.但是规模非常好"并不能量化您的需求.所以我决定用一个测试功能对上面的内容进行基准测试.下面是测试函数:

Scaling Extremely Well

Given your thinking about a data store, it seems you're really concerned with throughput and durability. But "scale extremely well" doesn't quantify your needs. So I decided to benchmark the above with a test function. Here's the test function:

def test():
    import time
    import urllib2
    import subprocess

    import random
    random = random.Random(0)

    from progressbar import ProgressBar, Percentage, Bar, ETA
    widgets = [Percentage(), Bar(), ETA()]

    def make_name():
        alphabet = 'abcdefghijklmnopqrstuvwxyz'
        return ''.join(random.choice(alphabet)
                       for rpt in xrange(random.randrange(3, 20)))

    def make_request(cmds):
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(cmds)
        text = sock.recv(1024)
        sock.close()

    print 'Starting server and waiting 3 seconds.'
    subprocess.call('start cmd.exe /c python.exe queue_thing_gevent.py -l',
                    shell=True)
    time.sleep(3)

    tests = []
    def wrap_test(name, limit=10000):
        def wrap(func):
            def wrapped():
                progress = ProgressBar(widgets=widgets)
                for rpt in progress(xrange(limit)):
                    func()
                secs = progress.seconds_elapsed
                print '{0} {1} records in {2:.3f} s at {3:.3f} r/s'.format(
                    name, limit, secs, limit / secs)
            tests.append(wrapped)
            return wrapped
        return wrap

    def direct_append():
        name = make_name()
        pqueue.append(name)

    count = 1000000
    @wrap_test('Loaded', count)
    def direct_append_test(): direct_append()

    def append():
        name = make_name()
        make_request('append ' + name)

    @wrap_test('Appended')
    def append_test(): append()

    ...

    print 'Running speed tests.'
    for tst in tests: tst()

基准测试结果

我对笔记本电脑上运行的服务器进行了 6 次测试.我认为结果非常好.这是输出:

Benchmark Results

I ran 6 tests against the server running on my laptop. I think the results scale extremely well. Here's the output:

Starting server and waiting 3 seconds.
Running speed tests.
100%|############################################################|Time: 0:00:21
Loaded 1000000 records in 21.770 s at 45934.773 r/s
100%|############################################################|Time: 0:00:06
Appended 10000 records in 6.825 s at 1465.201 r/s
100%|############################################################|Time: 0:00:06
Promoted 10000 records in 6.270 s at 1594.896 r/s
100%|############################################################|Time: 0:00:05
Demoted 10000 records in 5.686 s at 1758.706 r/s
100%|############################################################|Time: 0:00:05
Took 10000 records in 5.950 s at 1680.672 r/s
100%|############################################################|Time: 0:00:07
Mixed load processed 10000 records in 7.410 s at 1349.528 r/s

最终边界:持久性

最后,耐用性是我没有完全原型化的唯一问题.但我也不认为这有那么难.在我们的优先级队列中,项目的堆(列表)包含我们需要将数据类型保存到磁盘的所有信息.由于使用 gevent,我们还可以以多处理方式生成函数,我想象使用这样的函数:

Final Frontier: Durability

Finally, durability is the only problem I didn't completely prototype. But I don't think it's that hard either. In our priority queue, the heap (list) of items has all the information we need to persist the data type to disk. Since, with gevent, we can also spawn functions in a multi-processing way, I imagined using a function like this:

def save_heap(heap, toll):
    name = 'heap-{0}.txt'.format(toll)
    with open(name, 'w') as temp:
        for val in heap:
            temp.write(str(val))
            gevent.sleep(0)

并将保存功能添加到我们的优先级队列中:

and adding a save function to our priority queue:

def save(self):
    heap_copy = tuple(self.heap)
    toll = self.toll
    gevent.spawn(save_heap, heap_copy, toll)

您现在可以复制 Redis 模型的分叉并将数据存储写入磁盘每隔几分钟.如果您需要更高的耐用性,请将上述内容与将命令记录到磁盘的系统相结合.这些是 Redis 使用的 AFP 和 RDB 持久化方法.

You could now copy the Redis model of forking and writing the data store to disk every few minutes. If you need even greater durability then couple the above with a system that logs commands to disk. Together, those are the AFP and RDB persistence methods that Redis uses.

相关文章