返回到文章

采纳

编辑于

Java从A库读出,写入B库

java
java_shares

由于数据库采用的是blob格式,存储的是sql,dba无法直接进行导标操作,就写了个java程序进行格式转换,从A库导入到B库。

package com.system.dispatch.demo;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class DateDemo {

    // JDBC 驱动名及数据库 URL
    static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
    static final String DB_URL = "jdbc:mysql://10.0.20.xxx:3306/DBxxxx";

    // 数据库的用户名与密码,需要根据自己的设置
    static final String USER = "xxxxx";
    static final String PASS = "xxxxx";

    private BlockingQueue<DataDTO> blockQueue = new LinkedBlockingQueue<>(10000);

    public static void main(String[] args) throws InterruptedException {
        new DateDemo().start();
        Thread.sleep(10 * 60 * 60 * 1000);
    }

    public void start() {
        Thread readDB = new Thread(this::readDB, "readDB");
        readDB.start();

        Thread writeDB = new Thread(this::wirthDB, "writeDB");
        writeDB.start();
    }

    // 从A库读取
    public void readDB() {
        Connection conn = null;
        Statement stmt = null;
        try {
            // 注册 JDBC 驱动
            Class.forName(JDBC_DRIVER);

            // 打开链接
            System.out.println("连接数据库...");
            conn = DriverManager.getConnection(DB_URL, USER, PASS);

            PreparedStatement pstmst = null;
            ResultSet rs = null;
            int TotalCount = 0;
            int page = 0;
            while (true) {
                System.out.println("");
                String sql = "select sql_id,start_time,sql_content,bind_var from t_data_dbsql_content where 1=1 limit ?,?";
                pstmst = conn.prepareStatement(sql);
                pstmst.setInt(1, page * 1000);
                pstmst.setInt(2, 1000);
                page++;
                long stat_time = System.currentTimeMillis();
                rs = pstmst.executeQuery();
                System.out.println("查询耗时: " + (System.currentTimeMillis() - stat_time));

                int count = 0;
                while (rs.next()) {
                    long sql_id = rs.getLong("sql_id");     //config是blob类型的
                    Date start_time = rs.getDate("start_time");     //config是blob类型的
                    InputStream sql_content = rs.getBinaryStream("sql_content");     //config是blob类型的
                    InputStream bind_var = rs.getBinaryStream("bind_var");     //config是blob类型的

                    DataDTO dataDto = new DataDTO(sql_id, start_time, (ByteArrayInputStream) sql_content, (ByteArrayInputStream) bind_var);
                    blockQueue.put(dataDto);
                    count++;
                    TotalCount++;
                }
                System.out.println("TotalCount==" + TotalCount);
                if (count < 1000) {
                    System.out.println("数据已经查询完成==" + TotalCount);
                    return;
                }
            }
        } catch (SQLException se) {
            // 处理 JDBC 错误
            se.printStackTrace();
        } catch (Exception e) {
            // 处理 Class.forName 错误
            e.printStackTrace();
        } finally {
            // 关闭资源
            try {
                if (stmt != null) stmt.close();
            } catch (SQLException se2) {
            }// 什么都不做
            try {
                if (conn != null) conn.close();
            } catch (SQLException se) {
                se.printStackTrace();
            }
        }
    }


    // 写入到B库
    public void wirthDB() {
        Connection conn = null;
        Statement stmt = null;
        ByteArrayInputStream stream = null;
        try {
            // 注册 JDBC 驱动
            Class.forName(JDBC_DRIVER);

            // 打开链接
            System.out.println("连接数据库...");
            conn = DriverManager.getConnection(DB_URL, USER, PASS);
            conn.setAutoCommit(false);

            while (true) {
                PreparedStatement pstmst = null;
                String sql = "insert into t_data_dbsql_content_new_1 (sql_id,start_time,sql_content,bind_var) values(?,?,?,?)";
                pstmst = conn.prepareStatement(sql);

                for (int i = 1; i <= 1000; i++) {
                    DataDTO dataDto = blockQueue.take();

                    pstmst.setLong(1, dataDto.sql_id);
                    pstmst.setDate(2, (Date) dataDto.start_time);
                    pstmst.setBinaryStream(3, dataDto.sql_content);
                    pstmst.setBinaryStream(4, dataDto.bind_var);
                    pstmst.addBatch();
                }

                try {
                    long stat_time = System.currentTimeMillis();
                    pstmst.executeBatch();
                    conn.commit();
                    System.out.println("批量提交耗时: " + (System.currentTimeMillis() - stat_time));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } catch (SQLException se) {
            // 处理 JDBC 错误
            se.printStackTrace();
        } catch (Exception e) {
            // 处理 Class.forName 错误
            e.printStackTrace();
        } finally {
            // 关闭资源
            try {
                if (stmt != null) stmt.close();
            } catch (SQLException se2) {
            }// 什么都不做
            try {
                if (conn != null) conn.close();
            } catch (SQLException se) {
                se.printStackTrace();
            }
        }
    }


    // 数据库对象
    class DataDTO {
        private long sql_id;
        private Date start_time;
        private ByteArrayInputStream sql_content;
        private ByteArrayInputStream bind_var;

        public DataDTO(long sql_id, Date start_time, ByteArrayInputStream sql_content, ByteArrayInputStream bind_var) {
            this.sql_id = sql_id;
            this.start_time = start_time;
            this.sql_content = sql_content;
            this.bind_var = bind_var;
        }
    }
}