文件上传与文件接收服务
2021-03-17 01:27
标签:task 工厂 保存文件 dna 压缩 inter async executor direct #文件上传 1、存在形式:web服务,可以跨平台部署 2、文件监控:使用apache下commons-io.jar包,继承FileAlterationListenerAdaptor类定义一个监听器,创建FileAlterationObserver观察者,将监听器注入到观察者,当文件发生变化,观察者会调用监听器的方法。 FileAlterationListenerAdaptor和FileAlterationObserver都属于org.apache.commons.io.monitor 包下的类,这个包的作用是监控指定目录下的文件状态,它使用观察者设计模式设计这些类的关系。 a、可设置监听路径 b、可设置监听间隔 c、可设置监听指定格式,支持开启和关闭配置,如果开启了指定格式,最终文件只上传指定格式的文件 3、扫描功能:为了解决历史文件问题 a、可以设置扫描频率,即每隔多久扫一次,只适合移动文件模式 b、支持开启和关闭配置 c、扫描到的文件采用并行流(利用CPU的多核)的方式,将任务交给线程池去执行 4、文件上传: a、可以设置上传时间段 b、上传模式[0-复制文件 1-移动文件] c、采用feign+okhttp完成,稳定高效,通过连接池来减小响应延迟,还有透明的GZIP压缩,请求缓存等优势。 d、使用线程池,异步多线程去执行,提高执行效率 #pom配置 #配置文件 #控制层 #启动类 #文件接收 1、存在形式:web服务,可以跨平台部署 2、自定义配置 a、可设置文件保存路径 b、可设置单个文件大小 c、可设置单次请求的文件总大小 #pom配置 #配置文件 #配置类-线程池 #配置类-feign #文件监听器 #文件监听工厂类 #文件监听服务接口 #文件监听服务接口实现 #文件上传服务接口 #文件上传服务实现 #启动类 文件上传与文件接收服务 标签:task 工厂 保存文件 dna 压缩 inter async executor direct 原文地址:https://www.cnblogs.com/lushichao/p/13041876.html
server.port=8000
server.servlet.context-path=/nb-file-server
spring.servlet.multipart.max-request-size=1024MB
spring.servlet.multipart.max-file-size=1024MB
file.upload.path=C:\\test\\tmp3
@RestController
@RequestMapping("/file")
public class FileController {
@Value("${file.upload.path}")
private String path;
@PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public boolean upload(@RequestPart("file") MultipartFile file) {
if (file == null) {
return false;
}
try {
File parent = new File(path);
if (!parent.exists()) {
boolean mkdirs = parent.mkdirs();
if (!mkdirs) {
return false;
}
}
// 保存文件
file.transferTo(new File(parent.getAbsolutePath() + File.separator + file.getOriginalFilename()));
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}
@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class)
public class FileServerApplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(FileServerApplication.class, args);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(FileServerApplication.class);
}
}
server.port=8001
feign.okhttp.enabled=true
logging.level.com.nubomed.apiservice.client=debug
#文件上传目标服务器地址
endpoint.file.server=http://192.168.1.220:8000/nb-file-server
#监听路径
monitor.dir=C:\\test\\tmp
#监听间隔,单位毫秒
monitor.interval=5000
#是否监听指定文件格式
monitor.file.suffix.enable=false
#监听文件格式
monitor.file.suffix=.txt
#上传文件时间段
file.upload.time.slot=00:00-08:00
#0-复制文件 1-移动文件
file.operate.mode=1
#是否开启主动扫描功能,[0-复制文件]模式下,不会执行文件上传操作
scanner.enable=true
#扫描频率,多少分钟执行一次
scanner.rate=1
@Configuration
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(5);
//配置最大线程数
executor.setMaxPoolSize(10);
//配置队列大小
executor.setQueueCapacity(1000);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("nb-file-client-async-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
@Configuration
public class FeignConfig {
@Bean
Logger.Level feignLoggerLevel() {
//记录请求和响应的标头,正文和元数据
return Logger.Level.FULL;
}
}
public class FileListener extends FileAlterationListenerAdaptor {
private ListenerService listenerService;
public FileListener(ListenerService listenerService) {
this.listenerService = listenerService;
}
@Override
public void onFileCreate(File file) {
listenerService.handleFileCreate(file);
}
@Override
public void onFileChange(File file) {
listenerService.handleFileChange(file);
}
@Override
public void onFileDelete(File file) {
}
@Override
public void onDirectoryCreate(File directory) {
}
@Override
public void onDirectoryChange(File directory) {
}
@Override
public void onDirectoryDelete(File directory) {
}
@Override
public void onStart(FileAlterationObserver observer) {
}
@Override
public void onStop(FileAlterationObserver observer) {
}
}
@Component
public class FileListenerFactory {
@Value("${monitor.dir}")
private String monitorDir;
@Value("${monitor.interval}")
private long interval;
@Value("${monitor.file.suffix.enable}")
private boolean enable;
@Value("${monitor.file.suffix}")
private String suffix;
@Autowired
private ListenerService listenerService;
public FileAlterationMonitor getMonitor() {
FileAlterationObserver observer = null;
// 创建过滤器
if (enable) {
IOFileFilter directories = FileFilterUtils.and(FileFilterUtils.directoryFileFilter(), HiddenFileFilter.VISIBLE);
IOFileFilter files = FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(suffix));
IOFileFilter filter = FileFilterUtils.or(directories, files);
//装配过滤器
observer = new FileAlterationObserver(new File(monitorDir), filter);
} else {
observer = new FileAlterationObserver(new File(monitorDir));
}
// 向监听者添加监听器,并注入业务服务
observer.addListener(new FileListener(listenerService));
// 返回监听者
return new FileAlterationMonitor(interval, observer);
}
}
public interface ListenerService {
/**
* 监听文件创建
*
* @param file 创建的文件
*/
void handleFileCreate(File file);
/**
* 监听文件修改
*
* @param file 修改的文件
*/
void handleFileChange(File file);
/**
* 执行文件扫描
*/
void handleScanner();
}
@Service
@Slf4j
public class ListenerServiceImpl implements ListenerService {
@Resource
private AsyncService asyncService;
@Value("${file.operate.mode}")
private Integer mode;
@Value("${file.upload.time.slot}")
private String timeSlot;
@Value("${monitor.dir}")
private String dir;
@Value("${monitor.file.suffix.enable}")
private boolean enableSuffix;
@Value("${monitor.file.suffix}")
private String suffix;
@Override
public void handleFileCreate(File file) {
log.info("发现新的文件[{}]...", file.getName());
if (isHandleTimeSlot()) {
//asyncService.handleFileUpload(file);
}
}
@Override
public void handleFileChange(File file) {
log.info("发现文件[{}]修改了...", file.getName());
if (isHandleTimeSlot()) {
asyncService.handleFileUpload(file);
}
}
@Override
public void handleScanner() {
if (mode == 0) {
log.info("[复制文件模式]不执行扫描操作!");
return;
}
log.info("开始扫描目录[{}]文件...", dir);
File file = new File(dir);
File[] files = file.listFiles();
if (files == null || files.length == 0) {
log.info("目录[{}]下没有发现文件!", dir);
return;
}
log.info("已扫描到[{}]个文件", files.length);
if (enableSuffix) {
log.info("已开启扫描[{}]格式文件", suffix);
List
public interface AsyncService {
/**
* 执行异步任务-上传文件
*
* @param file 目标文件
*/
void handleFileUpload(File file);
}
@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
private static final Map
@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class)
@EnableFeignClients
@EnableScheduling
@Slf4j
public class FileClientApplication extends SpringBootServletInitializer implements CommandLineRunner {
@Autowired
private FileListenerFactory fileListenerFactory;
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Autowired
private ListenerService listenerService;
@Value("${scanner.enable}")
private boolean enable;
@Value("${scanner.rate}")
private String rate;
public static void main(String[] args) {
SpringApplication.run(FileClientApplication.class, args);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(FileClientApplication.class);
}
@Override
public void run(String... args) throws Exception {
try {
// 创建监听者
FileAlterationMonitor fileAlterationMonitor = fileListenerFactory.getMonitor();
fileAlterationMonitor.start();
if (enable) {
log.info("启动主动扫描功能,扫描频率[{}]分钟执行一次", rate);
String cron = String.format("0/59 0/%s * * * ?", rate);
threadPoolTaskScheduler.schedule(() -> listenerService.handleScanner(), new CronTrigger(cron));
} else {
log.info("未启动主动扫描功能,原因[scanner.enable={}]", enable);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
上一篇:Mybatis报错org.apache.ibatis.binding.BindingException: Invalid bound statement (not found)问题解决办法汇总
下一篇:webGL 投影