【并发】7、借助redis 实现多线程生产消费队列
1、这是第一个简单的初始化版本,看起来比使用fqueue似乎更好用
package queue.redisQueue; import queue.fqueue.vo.TempVo; import redis.clients.jedis.Jedis; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.util.UUID; /** * @ProjectName: cutter-point * @Package: queue.redisQueue * @ClassName: RedisQueueProducter * @Author: xiaof * @Description: ${description} * @Date: 2019/6/12 9:44 * @Version: 1.0 */ public class RedisQueueProducter implements Runnable { private Jedis jedis; private String queueKey; public RedisQueueProducter(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } @Override public void run() { while(true) { try { Thread.sleep(100); //不存在则创建,存在则直接插入 //向redis队列中存放数据 //生成数据 TempVo tempVo = new TempVo(); tempVo.setName(Thread.currentThread().getName() + ",time is:" + UUID.randomUUID()); //序列化为字节 ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(arrayOutputStream); objectOutputStream.writeObject(tempVo); arrayOutputStream.flush(); try { int i = 0; while(i < 10) { long num = jedis.lpush(queueKey.getBytes(), arrayOutputStream.toByteArray()); if(num > 0) { System.out.println("成功!"); break; } ++i; } } catch (Exception e) { System.out.println("失败!"); // long num = jedis.lpush(queueKey.getBytes(), arrayOutputStream.toByteArray()); } } catch (Exception e) { e.printStackTrace(); } } } }
消费
package queue.redisQueue; import queue.fqueue.vo.EventVo; import redis.clients.jedis.Jedis; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; /** * @ProjectName: cutter-point * @Package: queue.redisQueue * @ClassName: RedisQueueConsume * @Author: xiaof * @Description: ${description} * @Date: 2019/6/12 10:08 * @Version: 1.0 */ public class RedisQueueConsume implements Runnable { private Jedis jedis; private String queueKey; public RedisQueueConsume(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } @Override public void run() { while(true) { byte bytes[] = null; try{ bytes = jedis.lpop(queueKey.getBytes()); } catch (Exception e) { } //反序列化对象 if(bytes == null || bytes.length <= 0) { Thread.yield(); continue; } ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); try { ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); EventVo eventVo = (EventVo) objectInputStream.readObject(); eventVo.doOperater(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } }
测试,这里我踩了个坑,切记每个线程最好先获取一次资源,也就是
jedisPool.getResource()
不然再并发操作的时候,2个线程同时使用一个连接,会导致服务无法使用
package queue.redisQueue; import org.junit.Before; import org.junit.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; /** * @ProjectName: cutter-point * @Package: queue.redisQueue * @ClassName: RedisQueueTest * @Author: xiaof * @Description: ${description} * @Date: 2019/6/12 10:09 * @Version: 1.0 */ public class RedisQueueTest { public static JedisPool jedisPool = null; public static Jedis jedis; @Before public void test0() { //静态块,初始化加载,看来fQueue并不支持多进程操作,但是多线程是支持的 try { if(jedisPool == null) { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(10000); jedisPoolConfig.setMaxIdle(2000); jedisPoolConfig.setMaxWaitMillis(2000); jedisPoolConfig.setTestOnBorrow(true); jedisPoolConfig.setTestOnReturn(true); jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 17399, 2000, "ZTE$soft987", 0); } if(jedis == null) { jedis = jedisPool.getResource(); } } catch (Exception e) { e.printStackTrace(); } } @Test public void test1() throws InterruptedException { //读写取数据 for(int i = 0; i < 5; ++i) { System.out.println("输出测试" + i); RedisQueueProducter producter = new RedisQueueProducter(jedisPool.getResource(), "xiaof"); Thread t = new Thread(producter); t.start(); } while(true) { Thread.sleep(1000); } } @Test public void test2() throws InterruptedException { //读写取数据 for(int i = 0; i < 2; ++i) { System.out.println("输出测试" + i); //切记一定要重新获取Resource,不然无法并发操作 RedisQueueConsume fqueueConsume = new RedisQueueConsume(jedisPool.getResource(), "xiaof"); Thread t = new Thread(fqueueConsume); t.setDaemon(true); t.start(); } while(true) { Thread.sleep(1000); } } }
结果:
相关文章