文件上传与文件接收服务

2021-03-17 01:27

阅读:626

标签:task   工厂   保存文件   dna   压缩   inter   async   executor   direct   

#文件上传

1、存在形式:web服务,可以跨平台部署

2、文件监控:使用apache下commons-io.jar包,继承FileAlterationListenerAdaptor类定义一个监听器,创建FileAlterationObserver观察者,将监听器注入到观察者,当文件发生变化,观察者会调用监听器的方法。

FileAlterationListenerAdaptor和FileAlterationObserver都属于org.apache.commons.io.monitor 包下的类,这个包的作用是监控指定目录下的文件状态,它使用观察者设计模式设计这些类的关系。

     a、可设置监听路径

     b、可设置监听间隔

     c、可设置监听指定格式,支持开启和关闭配置,如果开启了指定格式,最终文件只上传指定格式的文件

3、扫描功能:为了解决历史文件问题

     a、可以设置扫描频率,即每隔多久扫一次,只适合移动文件模式

     b、支持开启和关闭配置

     c、扫描到的文件采用并行流(利用CPU的多核)的方式,将任务交给线程池去执行

4、文件上传:

     a、可以设置上传时间段

     b、上传模式[0-复制文件 1-移动文件]

     c、采用feign+okhttp完成,稳定高效,通过连接池来减小响应延迟,还有透明的GZIP压缩,请求缓存等优势。

     d、使用线程池,异步多线程去执行,提高执行效率

 

#pom配置


org.springframework.boot
        spring-boot-starter-tomcat
        providedorg.apache.tomcat.embed
        tomcat-embed-jasper
        provided

 

#配置文件

server.port=8000
server.servlet.context-path=/nb-file-server
 
spring.servlet.multipart.max-request-size=1024MB
spring.servlet.multipart.max-file-size=1024MB
 
file.upload.path=C:\\test\\tmp3

 

#控制层

@RestController
@RequestMapping("/file")
public class FileController {
 
    @Value("${file.upload.path}")
    private String path;
 
    @PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public boolean upload(@RequestPart("file") MultipartFile file) {
        if (file == null) {
            return false;
        }
 
        try {
            File parent = new File(path);
            if (!parent.exists()) {
                boolean mkdirs = parent.mkdirs();
                if (!mkdirs) {
                    return false;
                }
            }
 
            // 保存文件
            file.transferTo(new File(parent.getAbsolutePath() + File.separator + file.getOriginalFilename()));
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
 
        return false;
    }
 
}

 

#启动类

@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class)
public class FileServerApplication extends SpringBootServletInitializer {
 
    public static void main(String[] args) {
        SpringApplication.run(FileServerApplication.class, args);
    }
 
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
        return builder.sources(FileServerApplication.class);
    }
 
}

 

#文件接收

1、存在形式:web服务,可以跨平台部署

2、自定义配置

      a、可设置文件保存路径

      b、可设置单个文件大小

      c、可设置单次请求的文件总大小

 

#pom配置


org.springframework.boot
        spring-boot-starter-tomcat
        providedorg.apache.tomcat.embed
        tomcat-embed-jasper
        providedorg.springframework.cloud
        spring-cloud-starter-openfeign
    io.github.openfeign
        feign-okhttp
    org.springframework
        spring-test
    

 

#配置文件

server.port=8001
feign.okhttp.enabled=true
logging.level.com.nubomed.apiservice.client=debug
#文件上传目标服务器地址
endpoint.file.server=http://192.168.1.220:8000/nb-file-server
 
#监听路径
monitor.dir=C:\\test\\tmp
#监听间隔,单位毫秒
monitor.interval=5000
#是否监听指定文件格式
monitor.file.suffix.enable=false
#监听文件格式
monitor.file.suffix=.txt
 
#上传文件时间段
file.upload.time.slot=00:00-08:00
#0-复制文件 1-移动文件
file.operate.mode=1
#是否开启主动扫描功能,[0-复制文件]模式下,不会执行文件上传操作
scanner.enable=true
#扫描频率,多少分钟执行一次
scanner.rate=1

 

#配置类-线程池

@Configuration
@EnableAsync
public class ExecutorConfig {
 
    @Bean
    public Executor asyncServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(5);
        //配置最大线程数
        executor.setMaxPoolSize(10);
        //配置队列大小
        executor.setQueueCapacity(1000);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("nb-file-client-async-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
 
}

 

#配置类-feign

@Configuration
public class FeignConfig {
 
    @Bean
    Logger.Level feignLoggerLevel() {
        //记录请求和响应的标头,正文和元数据
        return Logger.Level.FULL;
    }
 
}

 

#文件监听器

public class FileListener extends FileAlterationListenerAdaptor {
    private ListenerService listenerService;
 
    public FileListener(ListenerService listenerService) {
        this.listenerService = listenerService;
    }
 
    @Override
    public void onFileCreate(File file) {
        listenerService.handleFileCreate(file);
    }
 
    @Override
    public void onFileChange(File file) {
        listenerService.handleFileChange(file);
    }
 
    @Override
    public void onFileDelete(File file) {
    }
 
    @Override
    public void onDirectoryCreate(File directory) {
    }
 
    @Override
    public void onDirectoryChange(File directory) {
    }
 
    @Override
    public void onDirectoryDelete(File directory) {
    }
 
    @Override
    public void onStart(FileAlterationObserver observer) {
    }
 
    @Override
    public void onStop(FileAlterationObserver observer) {
    }
}

 

#文件监听工厂类

@Component
public class FileListenerFactory {
 
    @Value("${monitor.dir}")
    private String monitorDir;
 
    @Value("${monitor.interval}")
    private long interval;
 
    @Value("${monitor.file.suffix.enable}")
    private boolean enable;
 
    @Value("${monitor.file.suffix}")
    private String suffix;
 
    @Autowired
    private ListenerService listenerService;
 
    public FileAlterationMonitor getMonitor() {
        FileAlterationObserver observer = null;
 
        // 创建过滤器
        if (enable) {
            IOFileFilter directories = FileFilterUtils.and(FileFilterUtils.directoryFileFilter(), HiddenFileFilter.VISIBLE);
            IOFileFilter files = FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(suffix));
            IOFileFilter filter = FileFilterUtils.or(directories, files);
            //装配过滤器
            observer = new FileAlterationObserver(new File(monitorDir), filter);
        } else {
            observer = new FileAlterationObserver(new File(monitorDir));
        }
 
        // 向监听者添加监听器,并注入业务服务
        observer.addListener(new FileListener(listenerService));
 
        // 返回监听者
        return new FileAlterationMonitor(interval, observer);
    }
 
}

 

#文件监听服务接口

public interface ListenerService {
 
    /**
     * 监听文件创建
     *
     * @param file 创建的文件
     */
    void handleFileCreate(File file);
 
    /**
     * 监听文件修改
     *
     * @param file 修改的文件
     */
    void handleFileChange(File file);
 
    /**
     * 执行文件扫描
     */
    void handleScanner();
}

 

#文件监听服务接口实现

@Service
@Slf4j
public class ListenerServiceImpl implements ListenerService {
 
    @Resource
    private AsyncService asyncService;
 
    @Value("${file.operate.mode}")
    private Integer mode;
 
    @Value("${file.upload.time.slot}")
    private String timeSlot;
 
    @Value("${monitor.dir}")
    private String dir;
 
    @Value("${monitor.file.suffix.enable}")
    private boolean enableSuffix;
 
    @Value("${monitor.file.suffix}")
    private String suffix;
 
    @Override
    public void handleFileCreate(File file) {
        log.info("发现新的文件[{}]...", file.getName());
        if (isHandleTimeSlot()) {
            //asyncService.handleFileUpload(file);
        }
    }
 
    @Override
    public void handleFileChange(File file) {
        log.info("发现文件[{}]修改了...", file.getName());
        if (isHandleTimeSlot()) {
            asyncService.handleFileUpload(file);
        }
    }
 
    @Override
    public void handleScanner() {
        if (mode == 0) {
            log.info("[复制文件模式]不执行扫描操作!");
            return;
        }
 
        log.info("开始扫描目录[{}]文件...", dir);
        File file = new File(dir);
        File[] files = file.listFiles();
        if (files == null || files.length == 0) {
            log.info("目录[{}]下没有发现文件!", dir);
            return;
        }
 
        log.info("已扫描到[{}]个文件", files.length);
 
        if (enableSuffix) {
            log.info("已开启扫描[{}]格式文件", suffix);
            List fileList = Arrays.stream(files)
                    .filter(file1 -> file1.getName().contains(suffix)).collect(Collectors.toList());
            log.info("[{}]格式文件有[{}]个", suffix, fileList.size());
            fileList.parallelStream().forEach(file12 -> asyncService.handleFileUpload(file12));
            return;
        }
 
        Arrays.stream(files).parallel().forEach(file13 -> asyncService.handleFileUpload(file13));
    }
 
    private LocalDateTime parseStringToDateTime(String time) {
        String[] split = time.split(":");
        return LocalDateTime.of(LocalDate.now(), LocalTime.of(Integer.valueOf(split[0]), Integer.valueOf(split[1]), 0));
    }
 
    private boolean isHandleTimeSlot() {
        String[] split = timeSlot.split("-");
        LocalDateTime startTime = parseStringToDateTime(split[0]);
        LocalDateTime endTime = parseStringToDateTime(split[1]);
        LocalDateTime now = LocalDateTime.now();
        if (now.isBefore(startTime) || now.isAfter(endTime)) {
            log.info("文件上传的时间段为[{}]", timeSlot);
            return false;
        }
 
        return true;
    }
 
}

 

#文件上传服务接口

public interface AsyncService {
 
    /**
     * 执行异步任务-上传文件
     *
     * @param file 目标文件
     */
    void handleFileUpload(File file);
}

 

#文件上传服务实现

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
 
    private static final Map PROCESS_MAP = new ConcurrentHashMap();
 
    @Resource
    private FileUploadClient fileUploadClient;
 
    @Value("${file.operate.mode}")
    private Integer mode;
 
    @Override
    @Async("asyncServiceExecutor")
    public void handleFileUpload(File file) {
        log.info("当前线程[{}]", Thread.currentThread().getName());
 
        if (PROCESS_MAP.get(file.getName()) != null && PROCESS_MAP.get(file.getName())) {
            log.info("文件[{}]正在被处理中...", file.getName());
            return;
        }
        PROCESS_MAP.put(file.getName(), true);
 
        if (!file.exists()) {
            log.info("文件[{}]已被处理!", file.getName());
            return;
        }
 
        long start = System.currentTimeMillis();
        log.info("文件[{}]正在上传...", file.getName());
        boolean result = false;
 
        try {
            MultipartFile multipartFile = new MockMultipartFile("file", file.getName(),
                    MediaType.MULTIPART_FORM_DATA_VALUE, new FileInputStream(file));
            result = fileUploadClient.upload(multipartFile);
 
            if (result) {
                //移动文件
                if (mode == 1) {
                    log.info("开始删除文件[{}]...", file.getName());
                    boolean delete = file.delete();
                    if (delete) {
                        log.info("文件[{}]删除成功!", file.getName());
                    } else {
                        log.error("文件[{}]删除失败!", file.getName());
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
 
        long end = System.currentTimeMillis();
        long cost = end - start;
        if (result) {
            log.info("文件[{}]上传成功!耗时[{}]ms", file.getName(), cost);
        } else {
            log.error("文件[{}]上传失败!耗时[{}]ms", file.getName(), cost);
        }
        PROCESS_MAP.remove(file.getName());
    }
 
}

 

#启动类

@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class)
@EnableFeignClients
@EnableScheduling
@Slf4j
public class FileClientApplication extends SpringBootServletInitializer implements CommandLineRunner {
 
    @Autowired
    private FileListenerFactory fileListenerFactory;
 
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
 
    @Autowired
    private ListenerService listenerService;
 
    @Value("${scanner.enable}")
    private boolean enable;
 
    @Value("${scanner.rate}")
    private String rate;
 
    public static void main(String[] args) {
        SpringApplication.run(FileClientApplication.class, args);
    }
 
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
        return builder.sources(FileClientApplication.class);
    }
 
    @Override
    public void run(String... args) throws Exception {
        try {
            // 创建监听者
            FileAlterationMonitor fileAlterationMonitor = fileListenerFactory.getMonitor();
            fileAlterationMonitor.start();
 
            if (enable) {
                log.info("启动主动扫描功能,扫描频率[{}]分钟执行一次", rate);
                String cron = String.format("0/59 0/%s * * * ?", rate);
                threadPoolTaskScheduler.schedule(() -> listenerService.handleScanner(), new CronTrigger(cron));
            } else {
                log.info("未启动主动扫描功能,原因[scanner.enable={}]", enable);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

文件上传与文件接收服务

标签:task   工厂   保存文件   dna   压缩   inter   async   executor   direct   

原文地址:https://www.cnblogs.com/lushichao/p/13041876.html


评论


亲,登录后才可以留言!