欢迎您的访问
专注架构,Java,数据结构算法,Python技术分享

大数据量上传FTP

背景

笔者有一个需求是把将近一亿条数据上传到FTP服务器中,这些数据目前是存储在mysql中,是通过关联几张表查询出来的,查询出来的数据结果集一共是6个字段。要求传输的时候拆分成一个个小文件,每个文件大小不能超过500M。我的测试思路是对测试数据进行分页查询,比如每次分页查询10万条数据,写入到一个txt格式的文件中,攒到50万条数据时,把这个txt文件上传到Ftp中(粗略估算了一下,每个字段长度假设不超过255,),这就是一个小文件的上传。

一、windows下FTP的安装

笔者的开发环境是windows11,所以必须要搭建一个FTP环境以供测试使用

配置IIS web服务器

打开运行窗口【win+R】快捷键,输入 optionalfeatures 后点击确定:
在这里插入图片描述
在出来的弹框中找到Internet信息服务,并打开勾选以下配置 ,点击确定,等待windows系统自行添加相关应用配置
在这里插入图片描述

配置IIS web站点

现在本地磁盘创建一个FtpServer空文件夹
在这里插入图片描述
然后查看本机IP地址
打开运行【win+R】窗口输入cmd回车
然后输入ipconfig 查看IP
笔者本机连接的是无线网络,如果是连接的有线网络,则需要找对应的以太网适配器连接配置
在这里插入图片描述
接着 在开始栏中搜索 IIS 并点击进入IIS管理器
在这里插入图片描述
打开后在左侧 “网站” 右键菜单 打开 “添加FTP站点”
主要是填写FTP站点名称和服务的物理路径

在这里插入图片描述
点击下一页,填写本机当前网络的ip地址
在这里插入图片描述
再点下一页完成身份验证和授权信息
在这里插入图片描述
点击完成后,ftp服务器的windows搭建就结束了

打开防火墙,把以下服务勾选上
在这里插入图片描述
建立 FTP 服务之后,默认登陆 FTP 服务器的账号和密码就是本机 Administrator 的账户和密码,但是笔者不记得密码了,所以创建一个用户来管理FTP登录

此电脑->右击->显示更多选项->单击管理->本地用户和用户组->用户->右击创建新用户

在这里插入图片描述
ftp用户名和密码记好了
在这里插入图片描述
再在开始菜单找到IIS服务,点击FTP授权规则
在这里插入图片描述
右击编辑权限
在这里插入图片描述
在这里插入图片描述
点击添加
在这里插入图片描述
输入刚才创建的ftp用户名称,点击检查名称
在这里插入图片描述
把下面的权限都勾选上,点击确定
在这里插入图片描述
回到 Internet Information Services (IIS) 管理器,双击刚才选中的 “FTP授权规则”,点击右侧的”添加允许规则”

在这里插入图片描述
然后别忘了启动ftp,右击管理ftp站点,启动
在这里插入图片描述

登录ftp

地址是ftp://192.168.1.105,进入此电脑,输入地址回车
在这里插入图片描述
在这里插入图片描述
输入用户名和密码可以登录

至于浏览器访问,这在很早之前是可以的,但是后来各大浏览器厂商都禁止使用浏览器访问ftp资源,这里也就作罢了

更换ftp的ip

当本机网络环境发生改变时,比如无线网环境变了,导致ip地址变了,那么之前设置好的ip地址就失效了,ftp无法连接。
点开IIS管理器,点击绑定
在这里插入图片描述
点击编辑,修改IP地址即可
在这里插入图片描述

二、java连接ftp服务器

笔者使用java语言,所以给出springboot框架下访问ftp的方法
首先引入pom依赖 Apache Commons net

 <dependency>
     <groupId>commons-net</groupId>
     <artifactId>commons-net</artifactId>
     <version>3.10.0</version> 
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

我这里使用的是最新版,jdk21,可以根据自己的jdk版本适当降低版本,不报错就可以

FTP连接工具类

package com.execute.batch.executebatch.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;

import java.io.*;
import java.time.Duration;



@Slf4j
public class FtpUtil {
    
    public static boolean uploadFileToRoot(String host, int port, String username, String password, File localFile) {
        FTPClient ftpClient = null;
        FileInputStream fis = null;
        try {
            ftpClient = connectAndLogin(host, port, username, password);
            setBinaryFileType(ftpClient);
            ftpClient.setConnectTimeout(1000000000);
            Duration timeout = Duration.ofSeconds(1000000000);
            ftpClient.setDataTimeout(timeout);
            String remoteFileName = localFile.getName();
            fis = new FileInputStream(localFile);
            return ftpClient.storeFile(remoteFileName, fis);
        } catch (IOException e) {
            log.error("上传文件失败", e);
            return false;
        } finally {
            assert ftpClient != null;
            disconnect(ftpClient);
            if(fis != null){
                try {
                    fis.close();
                } catch (IOException e) {
                    log.error("关闭文件流失败", e);
                }
            }
        }
    }

    
    public static boolean uploadFileToPath(String host, int port, String username, String password, String remotePath, File localFile) {
        FTPClient ftpClient = null;
        FileInputStream fis = null;
        try {
            ftpClient = connectAndLogin(host, port, username, password);
            setBinaryFileType(ftpClient);
            ftpClient.setConnectTimeout(1000000000);
            Duration timeout = Duration.ofSeconds(1000000000);
            ftpClient.setDataTimeout(timeout);
            createRemoteDirectories(ftpClient, remotePath);

            String remoteFileName = localFile.getName();
            String fullRemotePath = remotePath + "/" + remoteFileName;
            fis = new FileInputStream(localFile);
            return ftpClient.storeFile(fullRemotePath, fis);
        } catch (IOException e) {
            log.error("上传文件失败", e);
            return false;
        } finally {
            assert ftpClient != null;
            disconnect(ftpClient);
            if(fis != null){
                try {
                    fis.close();
                } catch (IOException e) {
                    log.error("关闭文件流失败", e);
                }
            }
        }
    }

    
    private static void createRemoteDirectories(FTPClient ftpClient, String remotePath) throws IOException {
        String[] directories = remotePath.split("/");
        String currentPath = "";
        for (String dir : directories) {
            if (!dir.isEmpty()) {
                currentPath += "/" + dir;
                if (!ftpClient.changeWorkingDirectory(currentPath)) {
                    if (!ftpClient.makeDirectory(dir)) {
                        throw new IOException("无法创建远程目录: " + currentPath);
                    }
                    ftpClient.changeWorkingDirectory(dir);
                }
            }
        }
    }

    
    private static FTPClient connectAndLogin(String host, int port, String username, String password) throws IOException {
        FTPClient ftpClient = new FTPClient();
        ftpClient.connect(host, port);
        ftpClient.login(username, password);
        int replyCode = ftpClient.getReplyCode();
        if (!FTPReply.isPositiveCompletion(replyCode)) {
            throw new IOException("连接FTP服务器失败");
        }
        return ftpClient;
    }


    
    private static void disconnect(FTPClient ftpClient) {
        if (ftpClient.isConnected()) {
            try {
                ftpClient.logout();
            } catch (IOException ioe) {
                log.error("登出FTP服务器失败", ioe);
            }
            try {
                ftpClient.disconnect();
            } catch (IOException ioe) {
                log.error("断开FTP服务器连接失败", ioe);
            }
        }
    }

    
    private static void setBinaryFileType(FTPClient ftpClient) {
        try {
            ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
        } catch (IOException e) {
            throw new RuntimeException("设置传输二进制文件失败", e);
        }
    }

}

主要提供了两个方法uploadFileToRootuploadFileToPath,前者是上传到ftp服务器根目录下,后者上传到指定目录下,其中的连接时间设置的有点夸张,主要是传输时间长、数据量大,害怕断开。

注意:所有涉及到操作文件的流,包括输入流和输出流,使用完了,要及时关闭,否则占用资源不说,还会导致临时生成的文件无法删除。

笔者在ftp服务器下新建了一个文件,测试上传一个txt格式的文本文件,一个上传到根目录下,一个上传到newFile文件夹里
在这里插入图片描述

测试用例代码

package com.execute.batch.executebatch.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;


@Slf4j
public class FtpUtil {

    private static FTPClient connectAndLogin(String host, int port, String username, String password) throws IOException {
        FTPClient ftpClient = new FTPClient();
        ftpClient.connect(host, port);
        ftpClient.login(username, password);
        int replyCode = ftpClient.getReplyCode();
        if (!FTPReply.isPositiveCompletion(replyCode)) {
            throw new IOException("连接FTP服务器失败");
        }
        return ftpClient;
    }

    private static void disconnect(FTPClient ftpClient) {
        if (ftpClient.isConnected()) {
            try {
                ftpClient.logout();
            } catch (IOException ioe) {
                log.error("登出FTP服务器失败", ioe);
            }
            try {
                ftpClient.disconnect();
            } catch (IOException ioe) {
                log.error("断开FTP服务器连接失败", ioe);
            }
        }
    }

    private static void setBinaryFileType(FTPClient ftpClient) {
        try {
            ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
        } catch (IOException e) {
            throw new RuntimeException("设置传输二进制文件失败", e);
        }
    }

    
    public static boolean uploadFileToRoot(String host, int port, String username, String password, File localFile) {
        FTPClient ftpClient = null;
        try {
            ftpClient = connectAndLogin(host, port, username, password);
            setBinaryFileType(ftpClient);
            String remoteFileName = localFile.getName();
            return ftpClient.storeFile(remoteFileName, new FileInputStream(localFile));
        } catch (IOException e) {
            log.error("上传文件失败", e);
            return false;
        } finally {
            assert ftpClient != null;
            disconnect(ftpClient);
        }
    }

    
    public static boolean uploadFileToPath(String host, int port, String username, String password, String remotePath, File localFile) {
        FTPClient ftpClient = null;
        try {
            ftpClient = connectAndLogin(host, port, username, password);
            setBinaryFileType(ftpClient);
            createRemoteDirectories(ftpClient, remotePath);

            String remoteFileName = localFile.getName();
            String fullRemotePath = remotePath + "/" + remoteFileName;
            return ftpClient.storeFile(fullRemotePath, new FileInputStream(localFile));
        } catch (IOException e) {
            log.error("上传文件失败", e);
            return false;
        } finally {
            assert ftpClient != null;
            disconnect(ftpClient);
        }
    }

    
    private static void createRemoteDirectories(FTPClient ftpClient, String remotePath) throws IOException {
        String[] directories = remotePath.split("/");
        String currentPath = "";
        for (String dir : directories) {
            if (!dir.isEmpty()) {
                currentPath += "/" + dir;
                if (!ftpClient.changeWorkingDirectory(currentPath)) {
                    if (!ftpClient.makeDirectory(dir)) {
                        throw new IOException("无法创建远程目录: " + currentPath);
                    }
                    ftpClient.changeWorkingDirectory(dir);
                }
            }
        }
    }
}

执行后查看ftp服务器
在这里插入图片描述
发现根目录下和文件夹下都有上传的文件了
在这里插入图片描述

批量数据生成

笔者这里只模拟生成500万条数据,供测试使用

批处理工具类

package com.execute.batch.executebatch.utils;

import jakarta.annotation.Resource;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.function.BiFunction;

@Component
public class BatchInsertUtil {

    @Resource
    private final SqlSessionFactory sqlSessionFactory;

    public BatchInsertUtil(SqlSessionFactory sqlSessionFactory) {
        this.sqlSessionFactory = sqlSessionFactory;
    }

    
    @SuppressWarnings("all")
    public <T,U,R> int batchInsert(List<T> entityList, Class<U> mapperClass, BiFunction<T,U,R> function) {
        int i = 1;
        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
        try {
            U mapper = sqlSession.getMapper(mapperClass);
            for (T entity : entityList) {
               function.apply(entity,mapper);
               i++;
            }
            sqlSession.flushStatements();
            sqlSession.commit();
        } catch (Exception e) {
            throw new RuntimeException("批量插入数据失败", e);
        }finally {
            sqlSession.close();
        }
        return i-1;
    }
}

跑批数据

package com.execute.batch.executebatch;

import com.execute.batch.executebatch.entity.User;
import com.execute.batch.executebatch.mapper.UserMapper;
import com.execute.batch.executebatch.utils.BatchInsertUtil;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


@Component
public class DataSeeder implements CommandLineRunner {

    @Resource
    private ApplicationContext applicationContext;

    private ExecutorService executorService;
    private static final int TOTAL_RECORDS = 5000000;
    private static final int BATCH_SIZE = 10000;
    private static final int THREAD_POOL_SIZE = 10;

    @PostConstruct
    public void init() {
        executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    }

    @Override
    public void run(String... args) {
        long startTime = System.currentTimeMillis();

        List<Runnable> tasks = new ArrayList<>();
        for (int i = 0; i < TOTAL_RECORDS; i += BATCH_SIZE) {
            int finalI = i;
            tasks.add(() -> insertBatch(finalI, BATCH_SIZE));
        }

        tasks.forEach(executorService::execute);
        executorService.shutdown();
        long endTime = System.currentTimeMillis();
        System.out.println("Total time taken: " + (endTime - startTime) / 1000 + " seconds.");
    }

    public void insertBatch(int startId, int batchSize) {
        List<User> batch = new ArrayList<>(batchSize);
        Random random = new Random();
        for (int i = 0; i < batchSize; i++) {
            User user = createUser(startId + i, random);
            batch.add(user);
            System.out.println(user);
        }
        BatchInsertUtil util = new BatchInsertUtil(applicationContext.getBean(SqlSessionFactory.class));
        util.batchInsert(batch, UserMapper.class, (item,mapper)-> mapper.insertBatch(item));
    }

    private User createUser(int id, Random random) {
        User user = new User();
        user.setId(id);
        user.setName("User" + id);
        user.setEmail("user" + id + "@example.com");
        user.setAge(random.nextInt(80) + 20); 
        user.setAddress("Address" + id);
        user.setPhoneNumber("1234567890"); 
        return user;
    }
}

整个生成过程是十分漫长的,40分钟左右,数据查询结果生成了500万条数据
在这里插入图片描述

测试上传ftp

下面展示的两个是mybatis手动分页的写法,如果有其他查询参数,则可以建一个实体类,把rowbounds参数囊括进去作为一个属性即可

mapper接口层

在这里插入图片描述

mybatis的xml

在这里插入图片描述

测试用例

package com.execute.batch.executebatch.controller;

import com.execute.batch.executebatch.entity.User;
import com.execute.batch.executebatch.mapper.UserMapper;
import com.execute.batch.executebatch.utils.FtpUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.RowBounds;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.BiConsumer;



@RestController
@RequestMapping("/FTP")
@Slf4j
public class FTPController {

    @Resource
    private UserMapper userMapper;

    private final Object lock = new Object();

    @GetMapping(value = "/upload")
    public void upload() throws InterruptedException {
        String host = "192.168.1.103";
        int port = 21; 
        String username = "hulei";
        String password = "hulei";
        int pageSize = 450000;
        int offset = 0;
        int uploadCycle = 0;
        int totalUploaded = 0;
        boolean noData = false;
        while (true) {
            uploadCycle++;
            
            File tempFile = new File("D:/FTPFile", "user_data_" + uploadCycle + ".txt");
            while (true) {
                RowBounds rowBounds = new RowBounds(offset, pageSize);
                List<User> list = userMapper.queryBatch(rowBounds);
                if (!list.isEmpty()) {
                    MultiThreadWriteToFile(list, tempFile, getConsumer());
                    offset += pageSize;
                    totalUploaded += list.size();
                }
                if (list.isEmpty()) {
                    noData = true;
                    break;
                }
                
                if (totalUploaded >= 600000) {
                    break;
                }
            }
            
            if(!tempFile.exists()){
                break;
            }
            boolean uploadSuccess = FtpUtil.uploadFileToRoot(host, port, username, password, tempFile);
            if (uploadSuccess) {
                System.out.println("文件上传成功");
            } else {
                System.out.println("文件上传失败");
            }
            System.out.println("上传完成,已上传" + uploadCycle + "个批次");
            totalUploaded = 0;
            if (noData) {
                break;
            }
        }
    }


    private <T> void MultiThreadWriteToFile(List<T> list, File tempFile, BiConsumer<BufferedWriter, T> writeItemConsumer) throws InterruptedException {
        Path filePath = tempFile.toPath(); 
        try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8,
                StandardOpenOption.CREATE, 
                StandardOpenOption.WRITE, 
                StandardOpenOption.APPEND)) { 
            ExecutorService executor = Executors.newFixedThreadPool(10); 
            BlockingQueue<Integer> taskQueue = new ArrayBlockingQueue<>(list.size()); 
            for (int i = 0; i < list.size(); i++) { 
                taskQueue.add(i);
            }
            for (int i = 0; i < list.size(); i++) { 
                int index = taskQueue.take();
                executor.submit(() -> writeItemConsumer.accept(writer, list.get(index)));
            }
            executor.shutdown(); 
            boolean terminated = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
            if (!terminated) { 
                log.warn("线程池关闭超时");
            }
        } catch (IOException e) { 
            log.error("创建或写入文件发生错误: {},异常为: {}", tempFile.getAbsolutePath(), e.getMessage());
        }
    }

    private BiConsumer<BufferedWriter, User> getConsumer() {
        return (writer, item) -> {
            String str = String.join("|",
                    String.valueOf(item.getId()),
                    item.getName(),
                    item.getEmail(),
                    String.valueOf(item.getAge()),
                    item.getAddress(),
                    item.getPhoneNumber()
            );
            log.info("告警入湖数据拼接字符串:{}", str);
            try {
                synchronized (lock) {
                    writer.write(str);
                    writer.newLine();
                }
            } catch (IOException e) {
                log.error("写入告警入湖数据发生异常: {}", e.getMessage());
            }
        };
    }
}

简单分析下:分页查询数据,每次查询pageSize条数据,写入一个txt文件,当写入的总条数超过totalUpload时,就跳出内部while循环,上传当前txt文件。然后进入第二次外层while循环,创建第二个txt文件,内部循环分页查询数据写入第二个txt文件。。。以此类推,直至最后查不出数据为止。

注意:pageSize和totalUpload最好是倍数关系,比如pageSize = 50000,那么totalUpload最好是pageSize 的整数倍,如100000,150000,200000,这样可以保证当文件数较多时,大部分的文件中数据条数一样。

以下是我分批上传到ftp服务器的文件,一共500万条数据,字段做了处理,使用 | 拼接
在这里插入图片描述
在这里插入图片描述
写入的数据是乱序的,要求顺序写入的话,就不要使用多线程了。

文章知识点与官方知识档案匹配,可进一步学习相关知识
赞(0) 打赏
版权归原创作者所有,任何形式转载请联系作者;码农code之路 » 大数据量上传FTP

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏