Flume自定义Sink到PostgreSQL

2020-06-17 00:00:00 函数 开发 对象 方法 特性

前一篇文章《Flume简介与安装》已经介绍了Flume的相关知识,也提到了Flume的各个环节是解耦的,那么如果实际开发需要自定义开发组件呢?大概流程是怎样的?


下面以输出到PostgreSQL为例说明开发新组件的方法。

首先简单介绍PostgreSQL:

PostgreSQL是一种特性非常齐全的自由软件的对象-关系型数据库管理系统(ORDBMS),是以加州大学计算机系开发的POSTGRES,4.2版本为基础的对象关系型数据库管理系统。POSTGRES的许多领先概念只是在比较迟的时候才出现在商业网站数据库中。PostgreSQL支持大部分的SQL标准并且提供了很多其他现代特性,如复杂查询、外键、触发器、视图、事务完整性、多版本并发控制等。同样,PostgreSQL也可以用许多方法扩展,例如通过增加新的数据类型、函数、操作符、聚集函数、索引方法、过程语言等。另外,因为许可证的灵活,任何人都可以以任何目的免费使用、修改和分发PostgreSQL。


接下来就是开发过程了:


1. 核心代码

sink的实现需要start()、stop()、process()和configure()这四个方法,分别实现启动、结束、处理和取参数的功能。

具体代码如下:

package org.apache.flume.sink.pgsql;

import com.google.common.base.Preconditions;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;

// 类需要继承AbstractSink这个类和Configurable这个接口
public class PGSQLSink extends AbstractSink implements Configurable {
    // 定义相关对象和变量
    private static final Logger logger = LoggerFactory.getLogger(PGSQLSink.class);
    private static final String DEFAULT_SPLIT_CHARS = ",";
    private Connection connection;
    private Statement statement;
    private String hostname;
    private String port;
    private String url;
    private String user;
    private String password;
    private String tablename;
    private String columnName;

// configure方法作用是从配置文件取出相关参数,包括数据库连接等信息
    @Override
    public void configure(Context context) {
        columnName = context.getString("column_name");
        Preconditions.checkNotNull(columnName, "column_name must be set!!");
        user = context.getString("user");
        Preconditions.checkNotNull(user, "user must be set!!");
        password = context.getString("password");
        Preconditions.checkNotNull(password, "password must be set!!");
        hostname = context.getString("hostname");
        Preconditions.checkNotNull(hostname, "hostname must be set!!");
        port = context.getString("port");
        Preconditions.checkNotNull(port, "port must be set!!");
        url = "jdbc:postgresql://" + hostname + ":" + port + "/postgres";
        Preconditions.checkNotNull(url, "url must be set!!");
        tablename = context.getString("tablename");
        Preconditions.checkNotNull(tablename, "tablename must be set!!");
    }

    // 在整个sink结束时执行一遍
    @Override
    public synchronized void stop() {
        super.stop();
    }

    // 在整个sink开始时执行一遍
    @Override
    public synchronized void start() {
        super.start();
        try {
            connection = DriverManager.getConnection(url, user, password);
            logger.info("pgsql connected!!!"+connection);
            // 连接URL为 jdbc:mysql//服务器地址/数据库名 ,后面的2个参数分别是登陆用户名和密码
            statement = connection.createStatement();
            logger.info("statement created****"+statement);
        } catch (SQLException e) {
            // 处理异常
            e.printStackTrace();
        }
    }

    // 不断循环调用
    @Override
    public Status process() throws EventDeliveryException{

        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;

        try {
            transaction.begin();
            // 从channel取出event
            event = channel.take();

            if (event != null) {
                String body = new String(event.getBody(),"UTF-8");
                System.out.println("body: "+body);
                System.out.println("columnName: "+columnName);
                // 拼凑出SQL语句
                String sql = "insert into " + tablename + "(" + columnName + ") values(" + body + ")";
                logger.info(sql);
                // 执行SQL语句
                statement.executeUpdate(sql);
                logger.info("sql execute done.");
            } else {
                // event为空的处理
                result = Status.BACKOFF;
            }
            transaction.commit();
        } catch (Exception ex) {
            // 处理异常
            transaction.rollback();
            throw new EventDeliveryException("Failed to log event: " + event, ex);
        } finally {
            transaction.close();
        }
    // 返回状态
    return result;
    }
}

相关文章