由于数据库采用的是blob格式,存储的是sql,dba无法直接进行导标操作,就写了个java程序进行格式转换,从A库导入到B库。
package com.system.dispatch.demo;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class DateDemo {
// JDBC 驱动名及数据库 URL
static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
static final String DB_URL = "jdbc:mysql://10.0.20.xxx:3306/DBxxxx";
// 数据库的用户名与密码,需要根据自己的设置
static final String USER = "xxxxx";
static final String PASS = "xxxxx";
private BlockingQueue<DataDTO> blockQueue = new LinkedBlockingQueue<>(10000);
public static void main(String[] args) throws InterruptedException {
new DateDemo().start();
Thread.sleep(10 * 60 * 60 * 1000);
}
public void start() {
Thread readDB = new Thread(this::readDB, "readDB");
readDB.start();
Thread writeDB = new Thread(this::wirthDB, "writeDB");
writeDB.start();
}
// 从A库读取
public void readDB() {
Connection conn = null;
Statement stmt = null;
try {
// 注册 JDBC 驱动
Class.forName(JDBC_DRIVER);
// 打开链接
System.out.println("连接数据库...");
conn = DriverManager.getConnection(DB_URL, USER, PASS);
PreparedStatement pstmst = null;
ResultSet rs = null;
int TotalCount = 0;
int page = 0;
while (true) {
System.out.println("");
String sql = "select sql_id,start_time,sql_content,bind_var from t_data_dbsql_content where 1=1 limit ?,?";
pstmst = conn.prepareStatement(sql);
pstmst.setInt(1, page * 1000);
pstmst.setInt(2, 1000);
page++;
long stat_time = System.currentTimeMillis();
rs = pstmst.executeQuery();
System.out.println("查询耗时: " + (System.currentTimeMillis() - stat_time));
int count = 0;
while (rs.next()) {
long sql_id = rs.getLong("sql_id"); //config是blob类型的
Date start_time = rs.getDate("start_time"); //config是blob类型的
InputStream sql_content = rs.getBinaryStream("sql_content"); //config是blob类型的
InputStream bind_var = rs.getBinaryStream("bind_var"); //config是blob类型的
DataDTO dataDto = new DataDTO(sql_id, start_time, (ByteArrayInputStream) sql_content, (ByteArrayInputStream) bind_var);
blockQueue.put(dataDto);
count++;
TotalCount++;
}
System.out.println("TotalCount==" + TotalCount);
if (count < 1000) {
System.out.println("数据已经查询完成==" + TotalCount);
return;
}
}
} catch (SQLException se) {
// 处理 JDBC 错误
se.printStackTrace();
} catch (Exception e) {
// 处理 Class.forName 错误
e.printStackTrace();
} finally {
// 关闭资源
try {
if (stmt != null) stmt.close();
} catch (SQLException se2) {
}// 什么都不做
try {
if (conn != null) conn.close();
} catch (SQLException se) {
se.printStackTrace();
}
}
}
// 写入到B库
public void wirthDB() {
Connection conn = null;
Statement stmt = null;
ByteArrayInputStream stream = null;
try {
// 注册 JDBC 驱动
Class.forName(JDBC_DRIVER);
// 打开链接
System.out.println("连接数据库...");
conn = DriverManager.getConnection(DB_URL, USER, PASS);
conn.setAutoCommit(false);
while (true) {
PreparedStatement pstmst = null;
String sql = "insert into t_data_dbsql_content_new_1 (sql_id,start_time,sql_content,bind_var) values(?,?,?,?)";
pstmst = conn.prepareStatement(sql);
for (int i = 1; i <= 1000; i++) {
DataDTO dataDto = blockQueue.take();
pstmst.setLong(1, dataDto.sql_id);
pstmst.setDate(2, (Date) dataDto.start_time);
pstmst.setBinaryStream(3, dataDto.sql_content);
pstmst.setBinaryStream(4, dataDto.bind_var);
pstmst.addBatch();
}
try {
long stat_time = System.currentTimeMillis();
pstmst.executeBatch();
conn.commit();
System.out.println("批量提交耗时: " + (System.currentTimeMillis() - stat_time));
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (SQLException se) {
// 处理 JDBC 错误
se.printStackTrace();
} catch (Exception e) {
// 处理 Class.forName 错误
e.printStackTrace();
} finally {
// 关闭资源
try {
if (stmt != null) stmt.close();
} catch (SQLException se2) {
}// 什么都不做
try {
if (conn != null) conn.close();
} catch (SQLException se) {
se.printStackTrace();
}
}
}
// 数据库对象
class DataDTO {
private long sql_id;
private Date start_time;
private ByteArrayInputStream sql_content;
private ByteArrayInputStream bind_var;
public DataDTO(long sql_id, Date start_time, ByteArrayInputStream sql_content, ByteArrayInputStream bind_var) {
this.sql_id = sql_id;
this.start_time = start_time;
this.sql_content = sql_content;
this.bind_var = bind_var;
}
}
}