Java批量导入百万级数据到mysql

需求:把一个500M的txt文件导入mysql数据库,数据量大概有几千万。

1、项目架构

项目采用微服务架构,上传文件的后台管理系统是作为应用层,业务处理的作为服务层,应用层和服务层都有多个节点。

如果是单一节点处理,那么读取500M的文件(可能更大),并把几百万的数据要快速的写入数据库,是非常有难度的。但是我们可以利用微服务多个节点分散处理,即应用层只读取数据,读10000条就放入线程池向服务层发起请求,而服务层有多个节点,可以把应用层的请求平均到多个节点处理,在应用层每个节点先做数据解析,然后插入mysql。

2、服务层处理

2.1 文件读取

文件读取我们用BufferedReader,BufferedReader使用装饰器模式,它的IO行为是每次读进来8K的数据到缓冲区(当然,缓冲区的大小我们是可以通过构造器修改的),如果需要使用数据的时候,再直接从缓冲区里面拿出数据来使用。

而FileReader的read方法,每调用一次就会read一次file,进行一次IO。不管是多次read还是一次性的read,都不是很优雅的在read文件的方式。多次read必然会产生多次IO,一次性的read如果遇到很大的文件,对内存是极不友好的。

所以BufferedReader既能提高的读取速度,又节省了IO的次数,是一种比较优雅的读取文件的方式。
BufferedWriter和FIleWriter同理。

研究一下BufferedReader的源码,就会发现,BufferedReader中对文件的读取还是通过FileReader来实现的,BufferedReader只是对其读取到的数据做一下缓冲,api如下。其中buffer操作的api和java nio中对buffer的操作类似。

image.png

有缓冲区 VS 没有缓冲区

  • 没有缓冲区时,每次读取操作都会导致一次文件读取操作(就是告诉操作系统内核我要读这个文件的这个部分,麻烦你帮我把它取过来)。
  • 有缓冲区时,会一次性读取很多数据,然后按要求分次交给上层调用者。
    读取块大小通常是按最适合硬件的大小来读的,因为对于硬件来说,一次读取一块连续数据(比如 1K)和一次读取一个字节需要的时间几乎是一样的(都是一次读操作,只是最终提交的数据量有差异),所以带缓冲的 I/O 和不带缓冲的相比效率差异是非常显著的

我们这里的需求是顺序读取,如果是随机读取,则使用RandomAccessFile。
所谓随机读取,就是说我们需要自由访问文件的任意位置(指定位置读,指定位置写),所以如果需要访问文件的部分内容,RandomAccessFile将是更好的选择。所以当我们要下载一个大文件时,可以通过多线程使用RandomAccessFile来实现。同样的,对于文件的切割、合并,使用RandomAccessFile效率都会很高。

2.2 业务处理

BufferedReader读取文件后,每读10000行,就丢入线程池,然后调用服务层处理,应用层这里不做任何业务逻辑处理,因为应用层必然是用一个节点处理业务,但是=我们通过http调用服务层后,是可以通过多个节点处理的,这样就可以提升处理效率。


ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        
BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("E:\\code\\esg-cemp-core-nonactivity-01394067\\logs\\system\\system2.log"), StandardCharsets.UTF_8));
       
            List<String> lines = new ArrayList<>();
            String lineTXT="";
            long count = 0;
            while ((lineTXT = bufferedReader.readLine()) != null) {
               if(count % 10000 == 0){
                   executorService.execute(new Runnable() {
                       @Override
                       public void run() {
                           //调用微服务
                           System.out.println("------调用微服务--------");
                       }
                   });
                   lines.clear();
               }
               lines.add(lineTXT);
               count++;
            }

            if(lines.size()>0){
                //调用微服务
                System.out.println("-------调用微服务-------");
            }
            //及时关闭线程池,对于这种使用不是很频繁的线程池使用完毕以后,可以及时关闭以节省资源
           //shutdown关闭以后,就不能再提交任务了,
            threadPoolExecutor.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }

3、业务层处理

3.1 业务处理

业务处理比较简单,就是把String 切割成多个字段,然后变换成PO对象。

3.2 mysql 插入

  • 方法一:mybatis foreach
    foreach的作用是构建in条件,通过foreach可以在SQL语句中进行迭代一个集合。
    优点是使用方便;
    缺点是每条sql是有长度限制的,所以list的数量跟表字段多少直接关联;

    activityProductUserFlowPoMapper.insertList(list2);
    
    <insert id="insertList" >
      insert into t_non_standard_act_user_flow (act_pro_id, user_id,
        mobile, point, status,
        extend)
      values
      <foreach collection="list" separator="," item="item">
        (#{item.actProId,jdbcType=BIGINT}, #{item.userId,jdbcType=VARCHAR},
        #{item.mobile,jdbcType=VARCHAR}, #{item.point,jdbcType=INTEGER}, #{item.status,jdbcType=TINYINT},
        #{item.extend,jdbcType=VARCHAR})
      </foreach>
    </insert>
    
  • 方法二:mybatis batch
    Mybatis内置的ExecutorType有3种,默认的是simple,该模式下它为每个语句的执行创建一个新的预处理语句,单条提交sql;而batch模式重复使用已经预处理的语句,并且批量执行所有更新语句。

    SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
    Mapper userMapper = sqlSession.getMapper(Mapper.class);
    int size = 10000;
      try {
          for (int i = 0; i < size; i++) {
                  User user = new User();
                  user.setName(String.valueOf(System.currentTimeMillis()));
                  fooMapper.insert(foo);
                 if (i % 1000 == 0 || i == size - 1) {
                      //手动每1000个一提交,提交后无法回滚
                      session.commit();
                      //清理缓存,防止溢出
                     session.clearCache();
                  }
          }
      } catch (Exception e) {
         //没有提交的数据可以回滚
        session.rollback();
      } finally {
        session.close();
      }
    

    需要在jdbc连接url上追加rewriteBatchedStatements=true,否则不起作用

  • 方法三:jdbc batch
    采用PreparedStatement.addBatch()方式实现,发送的是预编译后的SQL语句,执行效率高。

         Long start = System.currentTimeMillis();
          DruidDataSource dataSource = SpringUtils.getBean("dataSource");
          //从连接池中获取connection
          DruidPooledConnection conn =dataSource.getConnection();
          conn.setAutoCommit(false);
          try {
              String sql = "insert into act_user_flow (act_pro_id, user_id, mobile, point, status) values(?,?,?,?,?)";
              PreparedStatement ps = conn.prepareStatement(sql);
    
              for (int i = 0; i < list.size; i++) {
                  ps.setLong(1,list[i].getActProId());
                  ps.setString(2,list[i].getUserId());
                  ps.setString(3,list[i].getMobile());
                  ps.setInt(4,list[i].getPoint());
                  ps.setInt(5,DuobaoGoodEnum.ACTIVITY_PRODUCT_ONLINE.getCode());
                  ps.addBatch();
                  //小批量提交,避免OOM
                  if((i+1) % 1000 == 0) {
                      ps.executeBatch();
                      ps.clearBatch();
                  }
              }
              //提交剩余的数据
              ps.executeBatch();
              conn.commit();
          } catch (SQLException throwables) {
              throwables.printStackTrace();
          } finally {
              //使用完以后放回连接池
              conn.close();
          }
          System.out.println("insert cost time = " + (System.currentTimeMillis()-start));
    

    需要在jdbc连接url上追加rewriteBatchedStatements=true,否则不起作用

  • 性能比较
    同个表插入一万条数据时间近似值:
    JDBC BATCH 1.1秒左右 > Mybatis BATCH 2.2秒左右 >Mybatis foreach 4.5秒左右

    方法二和方法三利用的是mysql的批量提交,需要在jdbc连接url上追加rewriteBatchedStatements=true,如下

    jdbc:mysql://xxx.com.cn/xxx?useUnicode=true&amp;characterEncoding=utf8&allowMultiQueries=true&rewriteBatchedStatements=true
    

    MySQL JDBC驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。
    只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL
    另外这个选项对INSERT/UPDATE/DELETE都有效

扩展:

  • jdbc连接url上allowMultiQueries=true的作用:
1.可以在sql语句后携带分号,实现多语句执行。
2.可以执行批处理,同时发出多个SQL语句。
  • $ 和#的区别
#{}:是以预编译的形式,将参数设置到sql语句中;PreparedStatement;防止sql注入
${}:取出的值直接拼装在sql语句中;会有安全问题;
如下所示:
select * from tbl_employee where id=${id} and last_name=#{lastName}
Preparing: select * from tbl_employee where id=2 and last_name=?

如果id为 '2 or 1=1',则会发生数据泄露,这就是sql注入。
Preparing: select * from tbl_employee where id=2 or 1=1 and last_name=?
  • 获取DruidDataSource当前活跃连接数
//当前活跃的,即正在被使用的连接数
System.out.println(dataSource.getActiveCount()); ——活跃连接都在activeConnections(Map)中

//最大连接数
System.out.println("========="+dataSource.getMaxActive()); 

//poolingCount值代表剩余可用的连接数,每次从末尾拿走连接
System.out.println(dataSource.getPoolingCount());  ——剩余可用连接都在connections(数组)中

//activeCount + poolingCount只能小于等于maxActive
activeCount + poolingCount <= maxActive

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,406评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,976评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,302评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,366评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,372评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,457评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,872评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,521评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,717评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,523评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,590评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,299评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,859评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,883评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,127评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,760评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,290评论 2 342