Java[工作小总结] 使用RabbitMq异步使用多线程分段处理消费记录表数据上传excel文件并压缩导出

前言

前面讲了统计昨天的数据,分别使用策略模式以及分页查询去处理大量数据的应用(当然其中也可加一些索引,增加查询速度)。
这次主要是讲在原表上做导出操作, 先描述一下需求: 由于当数据较大的时候,直接点击导出按钮,会去查询数据库,有可能查询较慢,导致服务器会出现一些不可因素。因此需要弄一个数据导出的功能(在界面上创建一条记录的时候,用excel先把查询的数据先存放在服务器,限制每个Excel只有1W条或者几W条,然后上传成功之后再点击导出按钮,直接从服务器下载已经统计好的excel,用压缩包下载),然后当过期的时候就删除服务器的文件。并标记该记录已为过期不可导出(这个比较容易,可以弄定时去处理或者当天访问的时候就去做处理)。

涉及方案以及优化方案

方案:(异步方式处理上传文件)MQ + easyExcel + 分页查询 ,下载导出(文件压缩的方式)
优化方案:(异步方式处理上传文件)MQ + easyExcel + 分页查询 + 加多线程批量上传 ,下载导出(文件压缩的方式)

1. 增加pom.xml

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>

        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>easyexcel</artifactId>
            <version>3.0.5</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

2. 增加 application.yml rabbitmq 配置


server:
  port: 8083

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.xxx.xxx:3306/xxxxx?characterEncoding=utf-8&useSSL=false
    username: xxxxx
    password: xxxx

  rabbitmq:  # 新
    port: 5672 # 新
    username: xxx # 新
    password: xxx # 新
    virtual-host:  xx # 新
    publisher-returns: true  # 新
    publisher-confirm-type: correlated   # 新
    host: 192.168.xx.xx              # 新

mybatis:
    mapper-locations: classpath:mapper/*.xml
    type-aliases-package: com.xx.xxx
    configuration:
      log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
      map-underscore-to-camel-case: true
logging:
  level:
     com: debug

3. 增加 rabbitmqConfg 配置

@Configuration
@Slf4j
public class RabbitmqConfig {


   @Bean
   public ConnectionFactory connectionFactory() {
       CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
       connectionFactory.setHost("XXX.XX.X.XX");
       connectionFactory.setPort(XXX);
       connectionFactory.setPassword("XXX");
       connectionFactory.setUsername("XXX");
       connectionFactory.setVirtualHost("XX");
       connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
       connectionFactory.setPublisherReturns(true);
       connectionFactory.createConnection();
       return connectionFactory;
   }

   @Bean
   public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
       SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
       factory.setConnectionFactory(connectionFactory);
       return factory;
   }

   @Bean
   @Qualifier("rabbitTemplate")
   public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
       RabbitTemplate rabbitTemplate = new RabbitTemplate();
       rabbitTemplate.setConnectionFactory(connectionFactory);
       //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
       rabbitTemplate.setMandatory(true);
       // 确认机制
       rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
           @Override
           public void confirm(CorrelationData correlationData, boolean ack, String cause) {
               log.info("ConfirmCallback:     "+"相关数据:"+correlationData);
               log.info("ConfirmCallback:     "+"确认情况:"+ack);
               log.info("ConfirmCallback:     "+"原因:"+cause);
           }
       });
       // 返回机制
       rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
           @Override
           public void returnedMessage(ReturnedMessage returnedMessage) {
               log.info("ReturnCallback:     "+"消息:"+ returnedMessage.getMessage());
               log.info("ReturnCallback:     "+"回应码:"+ returnedMessage.getReplyCode());
               log.info("ReturnCallback:     "+"回应信息:"+ returnedMessage.getReplyText());
               log.info("ReturnCallback:     "+"交换机:"+ returnedMessage.getExchange());
               log.info("ReturnCallback:     "+"路由键:"+ returnedMessage.getRoutingKey());
           }
       });
       return rabbitTemplate;
   }
}

4. 增加 BaseConstant 配置

   // 队列
    String TEST_DIRECT_QUEUE = "testDirectQueue";

    // 交换机
    String TEST_DIRECT_EXCHANGE = "testDirectExchange";

    // 路由键
    String TEST_DIRECT_ROUTING_KEY = "testDirectRouting";




    /**
     * 定义导出最大限制
     */
    public static final Integer COUPLE_CONSUMPTION_UPLOAD_MAX = 30000;


    /**
     * 上传文件
     */
    public static String path = "D:\\exportFolder\\";

5. 增加 Rabbitmq 交换机和队列,路由键配置

使用Direct模式

@Configuration
public class DirectRabbitConfig {

    //队列 起名:testDirectQueue
    @Bean
    public Queue testDirectQueue() {
        return new Queue(BaseConstant.TEST_DIRECT_QUEUE,true);
    }

    //Direct交换机 起名:testDirectExchange
    @Bean
    DirectExchange testDirectExchange() {
        return new DirectExchange(BaseConstant.TEST_DIRECT_EXCHANGE);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:testDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(BaseConstant.TEST_DIRECT_ROUTING_KEY);
    }
}  

6. 增加 entity

    /**
     *  客户端调用接口传递使用
     */
@Data
public class OrderDeatilDTO {

    private Integer id;

    private Integer detailId;  // 订单Idd

    private String payType;  // 消费类型

    private String filePath;  // 路径

    private Date beginDate;    //创建开始时间

    private Date endDate;      //创建结束时间
}



    /**
     *  MQ传递的时候用
     */
@Getter
@Setter
@ToString
public class OrderDetailMessageDTO {

    private Integer id;

    private Integer detailId;  // 订单Idd

    private String payType;  // 消费类型

    private String filePath;  // 路径

    private Date beginDate;    //创建开始时间

    private Date endDate;      //创建结束时间
}

    /**
     *  报表导出
     */
@Data
public class OrderDeatilVO implements Serializable {

    /**
     * 数据ID
     */
    // 主键id
    @ExcelIgnore // 生成报表时忽略,不生成次字段
    private Integer id;
    /**
     * 订单id
     */
    @ExcelProperty(value = "订单编号", index = 0) // 定义表头名称和位置,0代表第一列
    private String orderNo;
    /**
     * 消费的用户
     */
    @ExcelProperty(value = "会员消费ID", index = 1) // 定义表头名称和位置,0代表第一列
    private String payMemberId;
    /**
     * 消费类型
     */
    @ExcelProperty(value = "会员消费类型", index = 2) // 定义表头名称和位置,0代表第一列
    private String payType;
    /**
     * 创建日期
     */
    @ExcelProperty(value = "创建时间", index = 3) // 定义表头名称和位置,0代表第一列
    @DateTimeFormat(value = "yyyy/MM/dd")
    private Date createDate;

    /**
     * 消费的点数、价格
     */
    @ExcelProperty(value = "价格点数", index = 4) // 定义表头名称和位置,0代表第一列
    private Integer actualPrice;
}

@Data
public class OrderDeatilStatisticsDailyQueryHist  implements Serializable {
    private static final long serialVersionUID = -20781826805439994L;
    /**
    * 主键ID
    */
    private Integer id;
    /**
    * 消费类型 vegetables 买菜, meat 买肉 ,    seafood 买海鲜 , furniture  买家具
    */
    private String payType;
    /**
    * 上传文件地址
    */
    private String filePath;
    /**
    * 开始时间
    */
    private Date beginDate;
    /**
    * 结束时间
    */
    private Date endDate;
    /**
    * 状态
    */
    private Integer status;
    /**
    * 创建时间
    */
    private Date createDate;
    /**
    * 更新时间
    */
    private Date updateDate;
    /**
    * 删除标记(0:未删除 1:删除)
    */
    private Boolean delFlag;
    
}

7. 增加 dao

public interface OrderDeatilStatisticsDailyQueryHistDao {

    /**
     * @Description: 通过ID查询单条数据
     * @param orderDeatilStatisticsDailyQueryHist 实例对象
     * @return 实例对象
     */
    OrderDeatilStatisticsDailyQueryHist getOrderDeatilStatisticsDailyQueryHistById(OrderDeatilStatisticsDailyQueryHist orderDeatilStatisticsDailyQueryHist);


    /**
     * @Description: 新增数据
     * @param orderDeatilStatisticsDailyQueryHist 实例对象
     */
    void save(OrderDeatilStatisticsDailyQueryHist orderDeatilStatisticsDailyQueryHist);

}

xml

<mapper namespace="com.mi.dao.OrderDeatilStatisticsDailyQueryHistDao">

    <!--查询单个-->
    <select id="getOrderDeatilStatisticsDailyQueryHistById" resultType="com.mi.entity.OrderDeatilStatisticsDailyQueryHist">
        select
          id, pay_type, file_path, begin_date, end_date, status, create_date, update_date, del_flag
        from order_deatil_statistics_daily_query_hist
        where id = #{id}
    </select>

       <!--通过实体作为筛选条件查询-->
    <select id="findOrderDeatilStatisticsDailyQueryHistList" resultType="com.mi.entity.OrderDeatilStatisticsDailyQueryHist">
        select
          id, pay_type, file_path, begin_date, end_date, status, create_date, update_date, del_flag
        from order_deatil_statistics_daily_query_hist
        <where>
            <if test="id != null">
                and id = #{id}
            </if>
            <if test="payType != null and payType != ''">
                and pay_type = #{payType}
            </if>
            <if test="filePath != null and filePath != ''">
                and file_path = #{filePath}
            </if>
            <if test="beginDate != null">
                and begin_date = #{beginDate}
            </if>
            <if test="endDate != null">
                and end_date = #{endDate}
            </if>
            <if test="status != null">
                and status = #{status}
            </if>
            <if test="createDate != null">
                and create_date = #{createDate}
            </if>
            <if test="updateDate != null">
                and update_date = #{updateDate}
            </if>
            <if test="delFlag != null">
                and del_flag = #{delFlag}
            </if>
        </where>
    </select>


    <!--新增所有列-->
    <insert id="save" keyProperty="id" useGeneratedKeys="true">
        insert into order_deatil_statistics_daily_query_hist(pay_type, file_path, begin_date, end_date, status, create_date, update_date, del_flag)
        values (#{payType}, #{filePath}, #{beginDate}, #{endDate}, #{status}, #{createDate}, #{updateDate}, #{delFlag})
    </insert>

    <!--通过主键修改数据-->
    <update id="update">
        update order_deatil_statistics_daily_query_hist
        <set>
            <if test="payType != null and payType != ''">
                pay_type = #{payType},
            </if>
            <if test="filePath != null and filePath != ''">
                file_path = #{filePath},
            </if>
            <if test="beginDate != null">
                begin_date = #{beginDate},
            </if>
            <if test="endDate != null">
                end_date = #{endDate},
            </if>
            <if test="status != null">
                status = #{status},
            </if>
            <if test="createDate != null">
                create_date = #{createDate},
            </if>
            <if test="updateDate != null">
                update_date = #{updateDate},
            </if>
            <if test="delFlag != null">
                del_flag = #{delFlag},
            </if>
        </set>
        where id = #{id}
    </update>

</mapper>

public interface OrderDeatilDao {
    /**
     * 分页查询返回集合  按天以每页500调或更少条数查询 ,以此减少对数据量查询时间的压力
     * @param orderDeatilDTO   参数,开始时间和结束时间
     * @param pageInfo 分页
     * @return
     */
    List<OrderDeatil> findOrderDeatilByDay(@Param("pageInfo") PageInfo pageInfo,@Param("deatilDTO")OrderDeatilDTO orderDeatilDTO);
}

xml

   <select id="findOrderDeatilByDay" resultType="com.mi.entity.OrderDeatil">
       select
          id, order_no,pay_member_id,pay_type,actual_price,create_date, del_flag
        from order_deatil
        where del_flag = 0
        <if test="deatilDTO.beginDate != null">
            and create_date &gt;= #{deatilDTO.beginDate}
        </if>
        <if test="deatilDTO.endDate != null">
            and create_date &lt;= #{deatilDTO.endDate}
        </if>
        <if test="deatilDTO.payType != null">
            and pay_type = #{deatilDTO.payType}
        </if>
        order by create_date
        <if test="pageInfo !=null">
            limit ${pageInfo.offset},${pageInfo.pageSize}
        </if>
    </select>

7. 增加 DirectService

public interface DirectService {

    /**
     * 发送
     * @param orderMessageDTO
     */
    public void sendMessage(OrderDetailMessageDTO orderMessageDTO);


    /**
     * 接收
     * @param message
     */
    public void reciveMessage(Message message);
}

8. 增加 DirectServiceImpl

注 : 这个可能有点问题,因为在同一个服务器上,不知道能不能发送队列后消费到

@Slf4j
@Service
public class DirectServiceImpl  implements DirectService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private OrderDeatilStatisticsExcelService orderDeatilStatisticsExcelService;

    ObjectMapper objectMapper = new ObjectMapper();

    /**
     *
     * @param orderMessageDTO
     */
    @Override
    public void sendMessage(OrderDetailMessageDTO orderMessageDTO) {
        try {
            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
            // 发送端确认是否确认消费
            CorrelationData correlationData = new CorrelationData();
            // 唯一ID
            correlationData.setId(orderMessageDTO.getId().toString());
            // 发送
            rabbitTemplate.convertAndSend(BaseConstant.TEST_DIRECT_EXCHANGE,BaseConstant.TEST_DIRECT_ROUTING_KEY,messageToSend,correlationData);
            log.info("发送成功");
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 处理上传业务逻辑
     * @param message
     */
    @RabbitListener(
            containerFactory = "rabbitListenerContainerFactory",
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = BaseConstant.TEST_DIRECT_QUEUE),
                            exchange = @Exchange(name = BaseConstant.TEST_DIRECT_EXCHANGE),
                            key = BaseConstant.TEST_DIRECT_ROUTING_KEY
                    )
            }
    )
    @Override
    public void reciveMessage(@Payload Message message) {
        log.info("========direct接受消息===========");
        String messageBody = new String(message.getBody());
        log.info(" body = {} " ,messageBody);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            OrderDeatilDTO dto = objectMapper.readValue(messageBody, OrderDeatilDTO.class);
            log.info(" =========== OrderDetailMessageDTO  {} ",dto);
            // 处理excel方法过程
            try {
                orderDeatilStatisticsExcelService.exportExcel(dto);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}

9. 增加 FileService

public interface FileService {
    /**
     * 文件上传
     * @param inputStream
     * @param filename
     */
    void upload(InputStream inputStream, String filename);
}

10. 增加 LocalFileServiceImpl

@Slf4j
@Service("localFileServiceImpl")
public class LocalFileServiceImpl implements FileService {



    @Override
    public void upload(InputStream inputStream, String filename) {
        // 拼接文件的存储文件
        String storagePath = BaseConstant.path + "/" + filename;
        try (
                // JDK8 TWR 不能关闭外部资源的
                InputStream innerInputStram =
                        inputStream;
                FileOutputStream outputStream =
                        new FileOutputStream(new File(storagePath));
                //
        )
        {
            //拷贝缓冲区
            byte [] buffer = new byte[1024];
            // 读取文件流长度
            int len;
            // 循环读取 inputStream中数据写入到outputStream
            while ((len = innerInputStram.read(buffer)) > 0 ){
                outputStream.write(buffer,0,len);
            }
            // 冲刷流
            outputStream.flush();

        }catch (Exception e){
            log.error("文件上传失败!",e);
            e.printStackTrace();
        }
    }
}

11. 增加 OrderDeatilStatisticsExcelService

public interface OrderDeatilStatisticsDailyQueryHistService {

    /**
     * @Description: 通过ID查询单条数据
     * @param orderDeatilStatisticsDailyQueryHist 实例对象
     * @return 实例对象
     */
    OrderDeatilStatisticsDailyQueryHist getOrderDeatilStatisticsDailyQueryHistById(OrderDeatilStatisticsDailyQueryHist orderDeatilStatisticsDailyQueryHist);


    /**
     * @Description: 新增数据
     * @param orderDeatilStatisticsDailyQueryHist 实例对象
     */
    void save(OrderDeatilStatisticsDailyQueryHist orderDeatilStatisticsDailyQueryHist);

}

12. 增加 OrderDeatilStatisticsDailyQueryHistServiceImpl

@Service("orderDeatilStatisticsDailyQueryHistService")
public class OrderDeatilStatisticsDailyQueryHistServiceImpl implements OrderDeatilStatisticsDailyQueryHistService {
    @Autowired
    private OrderDeatilStatisticsDailyQueryHistDao orderDeatilStatisticsDailyQueryHistDao;

    /**
     * @Description: 通过ID查询单条数据
     * @param orderDeatilStatisticsDailyQueryHist 实例对象
     * @return 实例对象
     */
    @Override
    public OrderDeatilStatisticsDailyQueryHist getOrderDeatilStatisticsDailyQueryHistById(OrderDeatilStatisticsDailyQueryHist orderDeatilStatisticsDailyQueryHist) {
        return this.orderDeatilStatisticsDailyQueryHistDao.getOrderDeatilStatisticsDailyQueryHistById(orderDeatilStatisticsDailyQueryHist);
    }
    /**
     * @Description: 新增数据
     * @param orderDeatilStatisticsDailyQueryHist 实例对象
     */
    @Override
    @Transactional
    public void save(OrderDeatilStatisticsDailyQueryHist orderDeatilStatisticsDailyQueryHist) {
        this.orderDeatilStatisticsDailyQueryHistDao.save(orderDeatilStatisticsDailyQueryHist);
    }
}

13. 增加 OrderDeatilStatisticsExcelService

注 : 主要处理导出相关功能接口

public interface OrderDeatilStatisticsExcelService {


    /**
     * 上传文件到服务器
     * @param orderDeatilDTO
     * @throws ParseException
     */
    public void exportExcel(OrderDeatilDTO orderDeatilDTO) throws ParseException;


    /**
     * 创建导出记录
     * @param orderDeatilDTO
     */
    void uploadExcel(OrderDeatilDTO orderDeatilDTO);


    /**
     * 下载压缩包
     * @param hist
     * @param request
     * @param response
     */
    void downLoad(OrderDeatilStatisticsDailyQueryHist hist, HttpServletRequest request, HttpServletResponse response);


    /**
     * 获取目录下所有
     * @param uploadPath
     * @return
     */
    List<File> getFilePath(String uploadPath);
}

14. OrderDeatilStatisticsExcelServiceImpl

注:这里有两种方案:
(1). 使用线程(模拟20W数据) 耗时: 2234
(2). 没有使用线程(模拟20W数据) 耗时: 5034

![image.png](https://upload-images.jianshu.io/upload_images/20862633-67657246c4facfa3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
@Slf4j
@Service
public class OrderDeatilStatisticsExcelServiceImpl implements OrderDeatilStatisticsExcelService {

    @Autowired
    private OrderDeatilDao orderDeatilDao;

    @Autowired
    private OrderDeatilStatisticsDailyQueryHistService orderDeatilStatisticsDailyQueryHistService;


    @Autowired
    private DirectService directService;
    
    /**
     * 处理导出数据
     *
     * @return
     */
    @Override
    public void exportExcel(OrderDeatilDTO orderDeatilDTO) throws ParseException {

        //获取拆分好的按天集合
        String beginDate = DateUtil.formatDate(orderDeatilDTO.getBeginDate(), "yyyy-MM-dd");
        String endDate = DateUtil.formatDate(orderDeatilDTO.getEndDate(), "yyyy-MM-dd");
        List<String> days = SplitDateUtil.getDayBetweenDates(beginDate, endDate);
        log.info(" ============= beginDate = {}  ,  endDate = {}  , days = {} , days of the size = {}", beginDate, endDate, days, days.size());

        // 命名页数
        int n = 1;

        for (int i = 0; i < days.size(); i++) {
            // 开始时间 00:00:00
            orderDeatilDTO.setBeginDate(DateTool.getStartOfDay(DateUtils.parseDate(days.get(i), "yyyy-MM-dd")));
            // 结束时间 23:59:59
            orderDeatilDTO.setEndDate(DateTool.getEndOfDay(DateUtils.parseDate(days.get(i), "yyyy-MM-dd")));
            //获取开始时间
            long startTime = System.currentTimeMillis();

            List<OrderDeatilVO> deatilList = new ArrayList<>();

            List<OrderDeatil> orderDeatils = null;
            PageInfo pageInfo = new PageInfo(0, 500);
            do {
                pageInfo.setOffset((pageInfo.getPageNo() - 1) * pageInfo.getPageSize());
                orderDeatils = orderDeatilDao.findOrderDeatilByDay(pageInfo, orderDeatilDTO);
                pageInfo.setPageNo(pageInfo.getPageNo() + 1);

                // 把查询的数据装进去实体类,以便直接装进excel
                for (OrderDeatil orderDeatil : orderDeatils) {
                    OrderDeatilVO deatilVO = new OrderDeatilVO();
                    BeanUtils.copyProperties(orderDeatil, deatilVO);
                    deatilList.add(deatilVO);
                }
                // 假如没有查询数据,可以试试用内存装进去
//            for (int j = 0 ; j<30000; j++){
//                OrderDeatilVO deatilVO = new OrderDeatilVO();
//                deatilVO.setId(j);
//                deatilVO.setPayType("vegetables");
//                deatilVO.setActualPrice(20);
//                deatilVO.setOrderNo("NO"+j);
//                deatilVO.setCreateDate(new Date());
//                deatilVO.setPayMemberId("memberNO" + j);
//                deatilList.add(deatilVO);
//            }
                // 要是为了怕内存溢出,限制在3W条左右就分批写入,按每1W条分批写入一个excel
                // 按每个1W数据去上传excel ||  页数最后一条数据 && 遍历处理月份最后的的长度相等
                if (deatilList.size() >= BaseConstant.COUPLE_CONSUMPTION_UPLOAD_MAX
//               || (pageInfo.getPageNo() == pageInfo.getLast() && i+1 == days.size())   这块这边没有分页的最后一页
                ) {
                    // 2.1  没有创建线程去处理写入
//        String rootupload = orderDeatilDTO.getFilePath() + "\\";
////        String fileName = "订单详情表" + begin + " --" + end + ".xlsx";
//        List<List<OrderDeatilVO>> partition = Lists.partition(deatilList, 10000);
//        for (int j = 0; j < partition.size(); j++) {
//            String fileName = rootupload + "订单详情表" + orderDeatilDTO.getBeginDate() + " --" + orderDeatilDTO.getEndDate() + ".xlsx" + "(" +i+")";
////            this.export(partition.get(j),fileName);
//        }
//        long endTime = System.currentTimeMillis();
//        log.info("  ===== {}  耗时:",(endTime - startTime));
//                     2.2  优化方案创建线程去处理写入
                     ImportTask.batchDeal(deatilList,10000,orderDeatilDTO);
                    // 3. 清掉内存
                    deatilList.clear();
                }
            } while (CollectionUtils.isNotEmpty(orderDeatils));
        }
    }

    /**
     * 处理上传excel文件
     *
     * @param orderDeatilDTO
     */
    @Override
    public void uploadExcel(OrderDeatilDTO orderDeatilDTO) {

        //  创建记录
        OrderDeatilStatisticsDailyQueryHist queryHist = new OrderDeatilStatisticsDailyQueryHist();
        queryHist.setPayType(orderDeatilDTO.getPayType());
        queryHist.setBeginDate(orderDeatilDTO.getBeginDate());
        queryHist.setEndDate(orderDeatilDTO.getEndDate());
        queryHist.setFilePath("" + System.currentTimeMillis());
        queryHist.setStatus(0);
        orderDeatilStatisticsDailyQueryHistService.save(queryHist);
        // 1. 通过异步的方式去处理      - 两步操作: 第一步:线程异步,    第二步用MQ的方式 ?
        OrderDetailMessageDTO messageDTO = new OrderDetailMessageDTO();
        messageDTO.setId(queryHist.getId());
        messageDTO.setPayType(queryHist.getPayType());
        messageDTO.setBeginDate(orderDeatilDTO.getBeginDate());
        messageDTO.setEndDate(orderDeatilDTO.getEndDate());
        messageDTO.setFilePath(queryHist.getFilePath());
        // 1. 通过异步的方式去处理 MQ
        directService.sendMessage(messageDTO);
    }


    /**
     * 执行数据库查询和Excel导出,将数据写入到outputStream中
     *
     * @param outputStream
     * @param orderDeatilVOS
     */
    private void export(OutputStream outputStream, List<OrderDeatilVO> orderDeatilVOS, String sheetName) {

        // 1. 需要创建一个EasyExcel导出对象
        ExcelWriter excelWriter = EasyExcelFactory.write(
                outputStream, OrderDeatilVO.class)
                .build();

        // 将数据写入到不通sheet页面中
        WriteSheet writeSheet = EasyExcelFactory.writerSheet("订单详情信息").build();
        excelWriter.write(orderDeatilVOS, writeSheet);
        // 4. 收尾 执行finish,才会关闭Excel文件流
        excelWriter.finish();
        log.info("完成导出!");
    }


    /**
     * @param hist
     * @param request
     * @param response
     */
    @Override
    public void downLoad(OrderDeatilStatisticsDailyQueryHist hist, HttpServletRequest request, HttpServletResponse response) {
        log.info("===============downLoad ============================= hist = {}", JSONObject.toJSON(hist));

        // 文件目录地址
        String excelsPath = hist.getFilePath();
        // 压缩后的文件名
        String fileName = BaseConstant.path + excelsPath + ".zip";
        //  具体文件夹地址
        String path = BaseConstant.path + excelsPath;
        // 获取目录下所有文件
        List<File> filePath = this.getFilePath(path);

        File filep = new File(fileName);
        // 文件输出流
        FileOutputStream outStream = null;
        try {
            outStream = new FileOutputStream(filep);
            // 压缩流
            ZipOutputStream toClient = new ZipOutputStream(outStream);
            // 设置压缩文件内的字符编码,不然会变成乱码
            ZipUtil.zipFile(filePath, toClient);
            toClient.close();
            outStream.close();
            // 文件压缩下载
            ZipUtil.downloadZip(filep, response);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ServletException e) {
            e.printStackTrace();
        }
    }

    @Override
    public List<File> getFilePath(String uploadPath) {
        File file = new File(uploadPath);
        // 获取目录下所有文件地址
        if (ArrayUtils.isNotEmpty(file.list())) {
            List<String> files = Arrays.asList(file.list());
            // 拼出所有地址
            List<File> filePath = new ArrayList<>();
            for (String str : files) {
                File file2 = new File(uploadPath + File.separator + str);
                filePath.add(file2);
            }
            return filePath;
        }
        return null;
    }
}

15. 多线程工具类

方案优化
使用了:CountDownLatch 方式 去分批上传文件

简单介绍:

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。这里就不在阐述。

@Slf4j
public class ImportTask implements Runnable {

    private List data;

    private CountDownLatch countDownLatch;

    private OrderDeatilDTO orderDeatilDTO;


    private Integer i;

    public ImportTask(List<OrderDeatilVO> data, CountDownLatch countDownLatch,OrderDeatilDTO orderDeatilDTO,int i) {
        this.data = data;
        this.countDownLatch = countDownLatch;
        this.orderDeatilDTO = orderDeatilDTO;
        this.i =i ;
    }

    /**
     * 业务处理逻辑
     */
    @Override
    public void run() {
        if (null != data) {
            // 业务处理
            log.info(" ===== {} ", data.size());
            String begin = DateUtil.formatDate(orderDeatilDTO.getBeginDate(), BaseConstant.DATE_FORMAT_YYYY_MM_DD);
            String end = DateUtil.formatDate(orderDeatilDTO.getEndDate(), BaseConstant.DATE_FORMAT_YYYY_MM_DD);
            String fileName = "订单详情表("+(i+1)+") " + begin + "--" + end + ".xlsx";
            // 批量上传文件
            this.export(data,fileName,orderDeatilDTO);
        }
        // 计数器减1
        countDownLatch.countDown();
    }

    /**
     * 批量上传逻辑
     * @param data  集合数据
     * @param bachNum    线程按每次执行多少条数
     * @param orderDeatilDTO
     */
    public static void batchDeal(List data , int bachNum , OrderDeatilDTO orderDeatilDTO){
        OrderDeatilStatisticsExcelService orderDeatilStatisticsExcelService = null;
        int totalNum = data.size();
        int pageNum = totalNum % bachNum == 0 ? totalNum / bachNum : totalNum / bachNum + 1;
        ExecutorService executorService = Executors.newFixedThreadPool(pageNum);
        try {
            long startTime = System.currentTimeMillis();
            CountDownLatch countDownLatch = new CountDownLatch(pageNum);
            List<OrderDeatilVO> subData = null;
            int fromIndex, toIndex;
            for (int i = 0 ; i <pageNum ; i++ ){
                fromIndex = i * bachNum;
                toIndex = Math.min(totalNum,fromIndex + bachNum);
                subData = data.subList(fromIndex,toIndex);
                ImportTask task = new ImportTask(subData,countDownLatch,orderDeatilDTO,i);
                executorService.execute(task);
            }
            // 进行阻塞,直到计数器为0 的时候才会执行
            countDownLatch.await();
            long endTime = System.currentTimeMillis();
            log.info(" 数据操作完成...   耗时:  {} ",(endTime - startTime));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }


    /**
     * 导出相关excel
     * @param data 集合数据
     * @param fileName  文件名
     * @param orderDeatilDTO
     */
    private void  export(List<OrderDeatilVO> data,String fileName,OrderDeatilDTO orderDeatilDTO){
        ByteArrayOutputStream outputStream =
                new ByteArrayOutputStream();

        // 1. 实现数据导出的Excel
        export(outputStream,data,"订单详情表");

        ByteArrayInputStream inputStream =
                new ByteArrayInputStream(outputStream.toByteArray());

        // 2. 上传文件处理上传文件目录
        String fileRequestPar = BaseConstant.path + orderDeatilDTO.getFilePath() ;
        fileRequestPar = fileRequestPar.replace("\\", "/");
        log.info(" fileRequestPar = {} " ,fileRequestPar);
        File filePath = new File(fileRequestPar);
        log.info(" filePath = {} " ,filePath);
        if (!filePath.exists()) { //若此目录不存在,则创建
            filePath.mkdirs();
        }

        // 3.  上传文件地址
        String uploadFile  =  BaseConstant.path + orderDeatilDTO.getFilePath() +"\\" + fileName;
        uploadFile = uploadFile.replace("\\", "/");
        log.info(" uploadFile = {}" ,uploadFile);
        File mFile = new File(uploadFile);
        log.info(" mFile = {}" ,mFile);
        if (mFile.exists()) {
            mFile.mkdirs();
        }
        String  fileExcelName = orderDeatilDTO.getFilePath() +"\\" + fileName;
        fileExcelName = fileExcelName.replace("\\", "/");
        FileUtils.upload(inputStream,fileExcelName);
    }

    /**
     *  处理excel导出的对象
     * @param outputStream
     * @param orderDeatilVOS
     * @param sheetName
     */
    private void export(OutputStream outputStream, List<OrderDeatilVO> orderDeatilVOS, String sheetName){

        // 1. 需要创建一个EasyExcel导出对象
        ExcelWriter excelWriter = EasyExcelFactory.write(
                outputStream, OrderDeatilVO.class)
                .build();
        // 将数据写入到不通sheet页面中
        WriteSheet writeSheet = EasyExcelFactory.writerSheet(sheetName).build();
        excelWriter.write(orderDeatilVOS,writeSheet);
        // 4. 收尾 执行finish,才会关闭Excel文件流
        excelWriter.finish();
        log.info("完成导出!");
    }
}

16. 导出相关工具类

(1). 压缩文件

package com.mi.utils;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

/**
 * @Author Leon
 * @Date 2021/9/6 19:21
 * @Describe:
 */
public class ZipUtil {

    /**
     * 压缩文件列表中的文件
     *
     * @param files
     * @param outputStream
     * @throws IOException
     */
    public static void zipFile(List files, ZipOutputStream outputStream) throws IOException, ServletException {
        try {
            int size = files.size();
            //压缩列表中的文件
            for (int i = 0; i < size; i++) {
                File file = (File)(files.get(i));
                try {
                    zipFile(file, outputStream);
                } catch (Exception e) {
                    continue;
                }
            }
        } catch (Exception e) {
            throw e;
        }
    }

    /**
     * 将文件写入到zip文件中
     *
     * @param inputFile
     * @param outputstream
     * @throws Exception
     */
    public static void zipFile(File inputFile, ZipOutputStream outputstream) throws IOException, ServletException {
        try {
            if (inputFile.exists()) {
                if (inputFile.isFile()) {
                    FileInputStream inStream = new FileInputStream(inputFile);
                    BufferedInputStream bInStream = new BufferedInputStream(inStream);
                    ZipEntry entry = new ZipEntry(inputFile.getName());
                    outputstream.putNextEntry(entry);

                    final int MAX_BYTE = 10 * 1024 * 1024;    //最大的流为10M
                    long streamTotal = 0;                      //接受流的容量
                    int streamNum = 0;                      //流需要分开的数量
                    int leaveByte = 0;                      //文件剩下的字符数
                    byte[] inOutbyte;                          //byte数组接受文件的数据

                    streamTotal = bInStream.available();                        //通过available方法取得流的最大字符数
                    streamNum = (int) Math.floor(streamTotal / MAX_BYTE);    //取得流文件需要分开的数量
                    leaveByte = (int) streamTotal % MAX_BYTE;                //分开文件之后,剩余的数量

                    if (streamNum > 0) {
                        for (int j = 0; j < streamNum; ++j) {
                            inOutbyte = new byte[MAX_BYTE];
                            //读入流,保存在byte数组
                            bInStream.read(inOutbyte, 0, MAX_BYTE);
                            outputstream.write(inOutbyte, 0, MAX_BYTE);  //写出流
                        }
                    }
                    //写出剩下的流数据
                    inOutbyte = new byte[leaveByte];
                    bInStream.read(inOutbyte, 0, leaveByte);
                    outputstream.write(inOutbyte);
                    outputstream.closeEntry();     //Closes the current ZIP entry and positions the stream for writing the next entry
                    bInStream.close();    //关闭
                    inStream.close();
                }
            } else {
                throw new ServletException("文件不存在!");
            }
        } catch (IOException e) {
            throw e;
        }
    }

    /**
     * 下载打包的文件
     *
     * @param file
     * @param response
     */
    public static void downloadZip(File file, HttpServletResponse response) {
        try {
            // 以流的形式下载文件。
            BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file.getPath()));
            byte[] buffer = new byte[fis.available()];
            fis.read(buffer);
            fis.close();
            // 清空response
            response.reset();

            OutputStream toClient = new BufferedOutputStream(response.getOutputStream());
            response.setContentType("application/octet-stream");
            response.setHeader("Content-Disposition", "attachment;filename=" + file.getName());
            toClient.write(buffer);
            toClient.flush();
            toClient.close();
            file.delete();        //将生成的服务器端文件删除
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

}

(2) . 处理文件逻辑 FileUtils

public class FileUtils {


    /**
     * 处理文件
     * @param inputStream
     * @param filename
     */
    public static void upload(InputStream inputStream, String filename) {
        // 拼接文件的存储文件
        String storagePath = BaseConstant.path + "/" + filename;
        try (
                // JDK8 TWR 不能关闭外部资源的
                InputStream innerInputStram =
                        inputStream;
                FileOutputStream outputStream =
                        new FileOutputStream(new File(storagePath));
                //
        )
        {
            //拷贝缓冲区
            byte [] buffer = new byte[1024];
            // 读取文件流长度
            int len;
            // 循环读取 inputStream中数据写入到outputStream
            while ((len = innerInputStram.read(buffer)) > 0 ){
                outputStream.write(buffer,0,len);
            }
            // 冲刷流
            outputStream.flush();

        }catch (Exception e){
            log.error("文件上传失败!",e);
            e.printStackTrace();
        }
    }
}

结语

记录前面的统计需求后续的文件上传功能,目的主要是在页面上创建一条记录,根据时间或一些其它条件先将这些数据生成excel存放在服务器上或其它空间,当上传完成后,点击导出的操作后直接下载一个压缩文件下来(这样做的好处是将已经处理好的数据放在某个地方,需要用的时候直接导出来,不用查询了再去导出来)。由于一开始只是用异步MQ的方式去处理,然后通过遍历的方式去处理,但后面可以使用异步MQ + 多线程的方式批量去处理(达到某个条数后,按条数开启一个线程去上传,这样就可以更快的上传)。
当然到后面的时候数据量更大的时候又要怎么处理呢(还是按分页查询和多线程分批处理,是不过可能会在数据库方面做一些方案处理,例如分库分表之类的,这样可能会提高查询的效率,暂时就是为后面做一个扩展思考)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342