如何确保我不会同时在两个线程之间共享同一个套接字?

我有一个处理套接字的代码,我需要确保我不会在两个线程之间共享同一个套接字.在下面的代码中,我有一个后台线程,它每 60 秒运行一次并调用 updateLiveSockets() 方法.在 updateLiveSockets() 方法中,我迭代了我拥有的所有套接字,然后通过调用 SendToQueue 类的 send 方法开始一个一个地 ping 它们并根据回复,我将它们标记为活的或死的.

I have a code in which I am dealing with sockets and I need to make sure that I don't share same socket between two threads. In my below code, I have a background thread which runs every 60 seconds and calls updateLiveSockets() method. In the updateLiveSockets() method, I iterate all the sockets I have and then start pinging them one by one by calling send method of SendToQueue class and basis on the response I mark them as live or dead.

现在所有读取线程将同时调用 getNextSocket() 方法以获取下一个实时可用套接字,因此它必须是线程安全的,我需要确保所有读取线程应该看到相同SocketHolderSocket 的一致状态.

Now all the reader threads will call getNextSocket() method concurrently to get the next live available socket so it has to be thread safe and I need to make sure all the reader threads should see the same consitent state of SocketHolder and Socket.

下面是我的 SocketManager 类:

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();
  private final ZContext ctx = new ZContext();

  // ...

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(this::updateLiveSockets, 60, 60, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(List<String> paddes, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    // ....
    return socketList;
  }

  // this method will be called by multiple threads concurrently to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean status = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
        boolean isLive = (status) ? true : false;

        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }
}

这是我的 SendToQueue 类:

  // this method will be called by multiple threads concurrently to send the data
  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
    PendingMessage m = new PendingMessage(address, encodedRecords, liveSockets.get().getSocket(), true);
    cache.put(address, m);
    return doSendAsync(m, socket);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      // Alternatively (checks that address points to m):
      // cache.asMap().remove(address, m);
      cache.invalidate(address);
    }
  }

问题陈述

现在你可以看到我在两个线程之间共享同一个套接字.getNextSocket() 似乎可以将 0MQ 套接字 返回到 thread A.同时,timer thread可以访问同一个0MQ socket来ping它.在这种情况下,thread Atimer thread 正在改变相同的 0MQ 套接字,这可能会导致问题.所以我试图找到一种方法,以便我可以防止不同的线程同时向同一个套接字发送数据并弄乱我的数据.

Now as you can see that I am sharing same socket between two threads. It seems getNextSocket() could return a 0MQ socket to thread A. Concurrently, the timer thread may access the same 0MQ socket to ping it. In this case thread A and the timer thread are mutating the same 0MQ socket, which can lead to problems. So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.

所以我决定同步套接字,这样没有两个线程可以同时访问同一个套接字.以下是我在 updateLiveSockets 方法中所做的更改.我通过以下方法在套接字上同步:

So I decided to synchronize the socket so that no two threads can access the same socket at the same time. Below is the change I made in updateLiveSockets method. I synchronized on the socket in below method:

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // using the socket as its own lock
        synchronized (socket) {
            // pinging to see whether a socket is live or not
            boolean status = SendToQueue.getInstance().execute(message.getAddress(), message.getEncodedRecords(), socket);
            boolean isLive = (status) ? true : false;

            SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
            liveUpdatedSockets.add(zmq);
        }
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }

下面是我在 doSendAsync 方法中所做的更改.在此我也在套接字上同步,然后再发送它.

And below is the change I made in doSendAsync method. In this also I synchronized on socket before sending on it.

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A by synchronizing on it
      synchronized (socket) {
        return msg.send(socket);
      }
    } finally {
      msg.destroy();
    }
  }

我可以确保我没有在两个线程之间共享相同的套接字的最佳方法是什么?一般来说,我有大约 60 个套接字和 20 个线程访问这些套接字.

What is the best way by which I can make sure that I am not sharing same sockets between two threads? In general I have around 60 sockets and 20 threads accessing those sockets.

如果许多线程使用同一个套接字,资源就没有得到很好的利用.此外,如果 msg.send(socket); 被阻塞(技术上不应该),所有等待这个套接字的线程都会被阻塞.所以我想可能有更好的方法来确保每个线程同时使用不同的单个活动套接字,而不是在特定套接字上同步.还有我错过的任何极端情况或边缘情况会导致一些错误吗?

If many threads use the same socket, resources aren't well utilized. Moreover If msg.send(socket); is blocked (technically it shouldn't) all threads waiting for this socket are blocked. So I guess there might be a better way to ensure that every thread uses a different single live socket at the same time instead of synchronization on a particular socket. Also is there any corner case or edge case that I have missed which can lead to some bug?

推荐答案

首先,您需要一种方法让客户端使用 Socket 通知您他们已完成.您可以添加一个允许他们发出信号的方法.这是合法的,它会起作用,但你必须依靠你的客户才能表现得很好.或者更确切地说,使用您的套接字的程序员不会忘记返回它.有一种模式可以帮助解决这个问题:execute around 模式.与其给出 Socket,不如创建一个接受 Consumer 的方法,然后执行消费者,并返回 Socket 本身.

First of all you need a way for clients to notify you that they're done using a Socket. You could add a method that allows them to signal this. That's legit and it will work, but you'll have to rely on your clients to be well behaved. Or rather that the programmer using your socket doesn't forget to return it. There's a pattern that helps address this : the execute around pattern. Rather than giving out a Socket, you make a method that accepts a Consumer<Socket>, and then executes the consumer, and does the returning of the Socket itself.

public void useSocket(Consumer<Socket> socketUser) {
    Socket socket = getSocket();
    try {
        socketUser.accept(socket);
    } finally {
        returnSocket(socket);
    }
}

现在让我们看看我们将如何实现 getSocket()returnSocket().显然,它涉及从某种集合中获取它们,然后将它们返回到该集合.Queue 在这里是一个不错的选择(正如其他人也指出的那样).它允许从一侧获取它并从另一侧返回,此外还有许多高效的线程安全实现,并且接受者和加法者通常不会相互竞争.由于您事先知道套接字的数量,我会选择 ArrayBlockingQueue.

Now let's look at how we're going to implement getSocket() and returnSocket(). Clearly it involves getting them from some sort of collection, and returning them back to that collection. A Queue is a good choice here (as others have also noted). It allows getting it from one side, and returning on the other, plus there are plenty of efficient thread safe implementations, and takers and adders are typically not in contention with one another. Since you know the number of sockets beforehand, I'd opt for an ArrayBlockingQueue.

这里还有一个问题是您的实现返回一个可选.如果没有可用的 Socket,我不确定您的客户会做什么,但如果它正在等待并重试,我建议您简单地使 getSocket() 阻塞在队列中.事实上,我会尊重您方法的这一方面,并​​考虑到可能没有可用的 Socket.对于 execute around 方法,如果没有可用的 Socket,这会将其转换为返回 falseuseSocket() 方法.

An additional concern here is that your implementation returns an Optional. I'm not sure what your clients will do if there is no available Socket, but if it is waiting and retrying, I'd suggest you simply make getSocket() blocking on the queue. As it is, I'll respect this aspect of your approach, and take into account that there may not have been a Socket available. For the execute around approach, this'll translate this into the useSocket() method returning false if no Socket was available.

private final BlockingQueue<Socket> queue;

public SocketPool(Set<Socket> sockets) {
    queue = new ArrayBlockingQueue<>(sockets.size());
    queue.addAll(sockets);
}

public boolean useSocket(Consumer<Socket> socketUser) throws InterruptedException {
    Optional<Socket> maybeSocket = getSocket();
    try {
        maybeSocket.ifPresent(socketUser);
        return maybeSocket.isPresent();
    } finally {
        maybeSocket.ifPresent(this::returnSocket);
    }
}

private void returnSocket(Socket socket) {
    queue.add(socket);
}

private Optional<Socket> getSocket() throws InterruptedException {
    return Optional.ofNullable(queue.poll());
}

就是这样,这就是你的 SocketPool.

There, that's it, that's your SocketPool.

啊,但是还有一点小气:检查活性.这很吝啬,因为您的活性检查实际上与您的常规客户竞争.

Ah, but then the stingy bit : the checking for liveness. It's stingy because your liveness check actually competes with your regular clients.

为了解决这个问题,我建议如下:让您的客户报告他们获得的 Socket 是否有效.由于检查活跃度归结为使用 Socket,这对您的客户来说应该很简单.

In order to address this, I suggest the following : let your clients report whether the Socket they got was live or not. Since checking for liveness comes down to using the Socket, this should be straightforward for your clients.

因此,我们将采用 Function 来代替 Consumer.如果函数返回 false,我们将认为 Socket 不再存在.在这种情况下,我们不会将其添加回常规队列,而是将其添加到死套接字集合中,并且我们将有一个计划任务,它会间歇性地重新检查死套接字.由于这种情况发生在单独的集合上,因此计划检查不再与常规客户竞争.

So instead of a Consumer<Socket>, we'll take a Function<Socket, Boolean>. And if the function returns false, we'll consider the Socket to be no longer live. In that case, rather than adding it back to the regular queue, we add it to a collection of dead Sockets, and we'll have a scheduled task, that rechecks the dead sockets intermittently. As this happens on a separate collection, the scheduled checking does not compete with regular clients any more.

现在您可以使用 Map 创建一个 SocketManager,将数据中心映射到 SocketPool 实例.此映射不需要更改,因此您可以将其设为 final 并在 SocketManager 的构造函数中对其进行初始化.

Now you can make a SocketManager with a Map that maps data centers to SocketPool instances. This map doesn't need to change, so you can make it final and initialize it in the SocketManager's constructor.

这是我的 SocketPool 初步代码(未经测试):

This is my preliminary code for SocketPool (untested) :

class SocketPool implements AutoCloseable {

    private final BlockingQueue<Socket> queue;
    private final Queue<Socket> deadSockets = new ConcurrentLinkedQueue<>();
    private final ScheduledFuture<?> scheduledFuture;

    public SocketPool(Set<Socket> sockets, ScheduledExecutorService scheduledExecutorService) {
        queue = new ArrayBlockingQueue<>(sockets.size());
        queue.addAll(sockets);
        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::recheckDeadSockets, 60, 60, TimeUnit.SECONDS);
    }

    public boolean useSocket(Function<Socket, Boolean> socketUser) throws InterruptedException {
        Optional<Socket> maybeSocket = getSocket();
        boolean wasLive = true;
        try {
            wasLive = maybeSocket.map(socketUser).orElse(false);
            return wasLive && maybeSocket.isPresent();
        } finally {
            boolean isLive = wasLive;
            maybeSocket.ifPresent(socket -> {
                if (isLive) {
                    returnSocket(socket);
                } else {
                    reportDead(socket);
                }
            });
        }
    }

    private void reportDead(Socket socket) {
        deadSockets.add(socket);
    }

    private void returnSocket(Socket socket) {
        queue.add(socket);
    }

    private Optional<Socket> getSocket() throws InterruptedException {
        return Optional.ofNullable(queue.poll());
    }

    private void recheckDeadSockets() {
        for (int i = 0; i < deadSockets.size(); i++) {
            Socket socket = deadSockets.poll();
            if (checkAlive(socket)) {
                queue.add(socket);
            } else {
                deadSockets.add(socket);
            }
        }
    }

    private boolean checkAlive(Socket socket) {
        // do actual live check with SendSocket class, or implement directly in this one
        return true;
    }

    @Override
    public void close() throws Exception {
        scheduledFuture.cancel(true);
    }
}

相关文章