PG(PostgreSQL)在一捅到底架构中的使用

2020-06-17 00:00:00 数据库 支持 转换 也有 数据格式

本想随便说说一捅到底架构,没想到内容还挺多的,要写三篇文章才能盖住全部内容……

传送门:

一捅到底的架构

用Vert.x实现Serverless架构

先说一下传统JEE里面,信息流在不同阶段的格式。

一般是这样的:

  1. 首先浏览器用Form表单提交数据,数据是Post请求的Body,格式大概是key1=value1&key2=value2&key3=value3...这种形式提交到服务器;
  2. 软件服务器比如Tomcat收到之后,将其解析,并封装成一个POJO吧,也就是一个普通的Java对象,其每一个属性名,对应着表单里面的每一个key,值就是属性值,随后将这个POJO转交至Service组件(Component),如果这里有EJB就更麻烦,需要做一层RMI调用;
  3. Service组件拿到这个POJO之后,就去找DAO,或者EJB里面的Entity Bean(JPA的前身),不管是哪一个,我们统一称之为Persistence吧,Persistence拿到之后呢,根据各种配置文件,比如XML或者DAO里面的Annotation,做映射,转换成JDBC的Prepared Statement,底层就是SQL,然后转交给数据库执行。

看这个流程,数据的格式经过了多次转换,Form -> POJO -> ORM -> Table,没有一个相同的,甚至在Service内部,还会有更多层次,每个层次甚至可能会有自己的DTO之类的定义,会做更多层的转换,这些转换非常繁琐,我们经常在一些JEE项目中可以看到,一个Table有几十个甚至上百个字段,可想而知这个转换起来的效率以及软件的维护成本有多高了,后的情况就是开发人员大多数时间都浪费在对这些字段上,而软件运行的大部分时间也都用在了这些数据格式的转换上。

从本质上说,从面向对象的角度出发思考,无论是Form还是POJO还是Table里面的每一条记录,难道不都是对象吗?

既然都是对象,我们是不是有办法统一起来呢?先的改变发生在前端,自从Restful Web Service提出之后,浏览器和软件服务器之间的通信,可以借助Post等Http请求的Body,直接将JSON数据格式予以提交,这样我们就在Form表单以外多了一个JSON数据格式的选择。

其次在Service这一层,各个语言也开始出现对于JSON数据格式的支持,这些支持可能是语言层面的,也有可能是类库,但是不管是哪一种,在Service开发上,也能够支持JSON数据格式鸟。

后是持久层,新兴的NoSQL数据库很多都将JSON作为其存储格式,但是我们并不想完全放弃传统的RDBMS,因为我们曾在数据库上做过不少投资,员工在大学期间学过数据库的课程,工作期间也有过数据库的使用经验,所以如果革命性地将存储转换到不怎么支持join和事务的NoSQL产品上,会面临着比较大的阻力,尤其是涉及到金钱交易,账户信息这些敏感数据,精度要求高,不使用事务等特性会有比较大的风险,所以还是寄希望于传统数据库RDBMS自身的改进,好是在原有的基础之上,新增JSON数据格式的支持。

那答案就是:PG!

PG在9.4版本之后,新增了JSONB数据类型的支持,所谓的JSONB其实就是JSON,只不过之前已经有过一个JSON数据格式了,但是该格式并不支持对JSON做相应的操作,比如建索引等,JSONB支持数据库对其中的Field建索引,可使用SQL语句查询JSON中具体的数据,同时PG支持gin索引,就是Inverted Index Table,针对结构混乱的数据格式,例如JSON,TEXT可用该算法提升查找效率,同时PG还提供了异步接口,Vert.x用户可使用以下异步客户端以提升执行效率:

Vert.x MySQL / PostgreSQL client

mauricio/postgresql-async

当浏览器,软件服务器以及数据库都齐刷刷地支持JSON之后,我们就能轻松地将之前混乱的数据格式统一起来,统一用JSON通信,这样每次操作就可以从浏览器一直捅到数据库,再结合我们之前说过的基于服务的架构,一些简单的服务,用一个Verticle就可以完成全部的CRUD 操作了,是不是很方便呢?

PG对比MySQL和MongoDB在性能上也有明显的优势,有兴趣的可自行测试,懒得做了……

后给出Future写法的AsyncClientVerticle的例子,从代码风格上说,跟Node.js的Promises以及Java自身的CompletableFuture类似,供参考:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.asyncsql.AsyncSQLClient;
import io.vertx.ext.asyncsql.PostgreSQLClient;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.sql.UpdateResult;

public class AsyncClientVerticle extends AbstractVerticle {

    AsyncSQLClient postgreSQLClient;

    public static final int SUCCESS = 1;
    public static final int FAILURE = ;
    public static final int EXISTED = -1;
    public static final int NOT_EXISTED = 2;

    public void start(){
        postgreSQLClient = PostgreSQLClient.createShared(vertx, config());

        vertx.eventBus().<JsonObject>consumer(getClass().getName(), msg ->{
            String method = msg.body().getString("method");
            Future<SQLConnection> connectionFuture = Future.future();
            Future<UpdateResult> updateResultFuture = Future.future();
            Future<ResultSet> resultSetFuture = Future.future();

            postgreSQLClient.getConnection(connectionFuture);
            connectionFuture.setHandler(asyncResult ->{
                if(asyncResult.succeeded()&&(method==null||method.equals("create"))){
                    JsonArray params = new JsonArray().add(msg.body().getString("account"))
                            .add(msg.body().getString("password"));
                    asyncResult.result().updateWithParams("INSERT INTO ACCOUNT(ACCOUNT,PASSWORD) VALUES (?,?)", params,updateResultFuture);
                }else if(asyncResult.succeeded()&&method.equals("read")){
                    JsonArray params = new JsonArray().add(msg.body().getString("account"))
                            .add(msg.body().getString("password"));
                    asyncResult.result().queryWithParams("SELECT * FROM ACCOUNT WHERE ACCOUNT = ? AND PASSWORD = ?", params,resultSetFuture);
                }else{
                    msg.reply(FAILURE);
                }
            });

            updateResultFuture.setHandler(asyncResult->{
                if(asyncResult.succeeded()){
                    msg.reply(SUCCESS);
                }else if(asyncResult.cause().getMessage().contains("duplicate key")){
                    msg.reply(EXISTED);
                }else{
                    msg.reply(FAILURE);
                }
                connectionFuture.result().close();
            });

            resultSetFuture.setHandler(asyncResult ->{
                if(asyncResult.succeeded()&&asyncResult.result().getNumRows()>){
                    msg.reply(SUCCESS);
                }else{
                    msg.reply(FAILURE);
                }
                connectionFuture.result().close();
            });
        });
    }

}

相关文章