Java架构-MYSQL大数据量下的操作与优化

 
前言当我们操作MySQL的时候,如果数据量很小,那么我们如何处理都没有问题 。但是当一张表非常大的时候,我们一个大查询,一个堆大插入,一个count(*),一个limit都是非常恐怖的,因此,我在下面说几种常用的优化方式 。
当表数据非常多的时候,我们不能一次把查询结果load进内存中,那会以下就OOM的,需要采用流式读取,也就是Hibernate中的ScrollableResult的方式,它的底层实现就是jdbc的流式读取 。
1. JDBC流式读取 (Hibernate ScrollableResult)
读取操作开始遇到的问题是当sql查询数据量比较大时候程序直接抛错,或是读不出来ResultSet的next方法阻塞 。
Root Casue: mysql driver 默认的行为是需要把整个结果全部读取到内存(ResultSet)中,才允许读取结果 。当遇到大数据的时候,这显然会导致OOM 。这显然与期望的行为不一致,期望的行为是jdbc流的方式读取,当结果从mysql服务端返回后立即开始读取处理 。这样应用就不需要大量内存来存储这个结果集 。
正确的jdbc流式读取代码:
PreparedStatement ps = connection.prepareStatement("select .. from ..",ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); //forward only read only也是mysql 驱动的默认值,所以不指定也是可以的 比如: PreparedStatement ps = connection.prepareStatement("select .. from .."); ps.setFetchSize(Integer.MIN_VALUE); //(也可以修改jdbc url通过defaultFetchSize参数来设置,这样默认所以的返回结果都是通过流方式读取.)ResultSet rs = ps.executeQuery(); while (rs.next()) {System.out.println(rs.getString("fieldName")); }代码分析:下面是mysql判断是否开启流式读取结果的方法,有三个条件forward-only,read-only,fatch size是Integer.MIN_VALUE
/** * We only stream result sets when they are forward-only, read-only, and the * fetch size has been set to Integer.MIN_VALUE * * @return true if this result set should be streamed row at-a-time, rather * than read all at once. */protected boolean createStreamingResultSet() { try { synchronized(checkClosed().getConnectionMutex()) { return ((this.resultSetType == JAVA.sql.ResultSet.TYPE_FORWARD_ONLY) && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this.fetchSize == Integer.MIN_VALUE)); } } catch (SQLException e) { // we can't break the interface, having this be no-op in case of error is ok return false; }}2. JDBC批量写入
当需要很多的数据一次性写入表中 。如果是一条一条的执行insert来写入,非常慢 。
Root Cause: 第一,单条写入需要大量的Database请求响应交互 。每个insert请求都是一个独立的Transaction commit 。这样网络延迟大的情况下多次请求会有大量的时间消耗的网络延迟上 。第二,是由于每个Transaction,Database都会有刷新磁盘操作写事务日志,保证事务的持久性 。由于每个事务只是写入一条数据,所以磁盘io利用率不高,因为对于磁盘io是按块来的,所以连续写入大量数据效率更好 。
所以,必须改成批量插入的方式,减少请求数与Transaction 。
下面是批量插入的例子:还有jdbc连接串必须加下rewriteBatchedStatements=true
int batchSize = 1000;PreparedStatement ps = connection.prepareStatement("insert into tb1 (c1,c2,c3...) values (?,?,?...)");for (int i = 0; i < list.size(); i++) { ps.setXXX(list.get(i).getC1()); ps.setYYY(list.get(i).getC2()); ps.setZZZ(list.get(i).getC3()); ps.addBatch(); if ((i + 1) % batchSize == 0) { ps.executeBatch(); }}if (list.size() % batchSize != 0) { ps.executeBatch();}上面代码示例是每1000条数据发送一次请求 。mysql驱动内部在应用端会把多次addBatch()的参数合并成一条multi value的insert语句发送给db去执行
比如insert into tb1(c1,c2,c3) values (v1,v2,v3),(v4,v5,v6),(v7,v8,v9)...
这样可以比每条一个insert 明显少很多请求 。减少了网络延迟消耗时间与磁盘io时间,从而提高了tps 。

代码分析: 从代码可以看出,
1 rewriteBatchedStatements=true,insert是参数化语句且不是insert ... select 或者 insert... on duplicate key update with an id=last_insert_id(...)的话会执行
executeBatchedInserts,也就是muti value的方式
2 rewriteBatchedStatements=true 语句是都是参数化(没有addbatch(sql)方式加入的)的而且mysql server版本在4.1以上 语句超过三条,则执行executePreparedBatchAsMultiStatement
就是将多个语句通过;分隔一次提交多条sql 。比如 "insert into tb1(c1,c2,c3) values (v1,v2,v3);insert into tb1(c1,c2,c3) values (v1,v2,v3)..."
3 其余的执行executeBatchSerially,也就是还是一条条处理
public void addBatch(String sql)throws SQLException { synchronized(checkClosed().getConnectionMutex()) { this.batchHasPlainStatements = true; super.addBatch(sql); }}public int[] executeBatch()throws SQLException { //... if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) { if (canRewriteAsMultiValueInsertAtSqlLevel()) { return executeBatchedInserts(batchTimeout); } if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null && this.batchedArgs.size() > 3 /* cost of option setting rt-wise */ ) { return executePreparedBatchAsMultiStatement(batchTimeout); } } return executeBatchSerially(batchTimeout); //.....}


推荐阅读