Spring Boot:Spring Boot整合FTPClient线程池

By admin in Python on 2019年10月6日

近年在写二个FTP上传工具,用到了Apache的FTPClient,可是各种线程频仍的创造和销毁FTPClient对象对服务器的压力十分大,因而,此处最棒利用三个FTPClient连接池。稳重翻了刹那间Apache的api,发掘它并不曾三个FTPClientPool的贯彻,所以,不得不自身写八个FTPClientPool。上面就差十分的少介绍一下支付连接池的全体进程,供我们参考。我们得以行使Apache提供的common-pool包来赞助我们付出连接池。而付出叁个简易的对象池,仅须要贯彻common-pool
包中的ObjectPool和PoolableObjectFactory多个接口就可以。

Spring线程池结合Spring托管线程Bean

@Component 注释评释Spring的托管Bean

@Scope(“prototype”)  注释表明为“多例”

 

package com.test.thread;



import org.springframework.context.annotation.Scope;

import org.springframework.stereotype.Component;



@Component

@Scope("prototype")

public class PrintTask implements Runnable {

        String name;



        public void setName(String name) {

                this.name = name;

        }



        @Override

        public void run(){

                System.out.println(name + " is running.");

                try{

                        Thread.sleep(5000);

                }catch(InterruptedException e){

                        e.printStackTrace();

                }

                System.out.println(name + " is running again.");

        }

}

 

package com.test.config;



import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.ComponentScan;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;



@Configuration

// 表示所扫描所包含的@Bean

@ComponentScan(basePackages="com.test.thread")

public class AppConfig {

        @Bean

        public ThreadPoolTaskExecutor taskExecutor(){

                ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();

                pool.setCorePoolSize(5); //线程池活跃的线程数

                pool.setMaxPoolSize(10); //线程池最大活跃的线程数 

                pool.setQueueCapacity(25); // 队列的最大容量 

                pool.setWaitForTasksToCompleteOnShutdown(true);

                return pool;

        }

}

 

package com.test;



import org.springframework.context.ApplicationContext;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;



import com.chszs.config.AppConfig;

import com.chszs.thread.PrintTask;



public class App {

        public static void main(String[] args) {

                ApplicationContext ctx = 

            new AnnotationConfigApplicationContext(AppConfig.class);

                ThreadPoolTaskExecutor taskExecutor =

            (ThreadPoolTaskExecutor)ctx.getBean("taskExecutor");



                PrintTask2 printTask1 = (PrintTask2)ctx.getBean("printTask");

                printTask1.setName("Thread 1");

                taskExecutor.execute(printTask1);



                PrintTask2 printTask2 = (PrintTask2)ctx.getBean("printTask");

                printTask2.setName("Thread 2");

                taskExecutor.execute(printTask2);



                PrintTask2 printTask3 = (PrintTask2)ctx.getBean("printTask");

                printTask3.setName("Thread 3");

                taskExecutor.execute(printTask3);



                for(;;){

                        int count = taskExecutor.getActiveCount();

                        System.out.println("Active Threads : " + count);

                        try{

                                Thread.sleep(1000);

                        }catch(InterruptedException e){

                                e.printStackTrace();

                        }

                        if(count==0){

                                taskExecutor.shutdown();

                                break;

                        }

                }

        }



}

 备注:如PrintTask类中有操作dao,service中的数据库@Autowired找不到类,那就要专心@ComponentScan(basePackages=”com.test.thread”)是或不是有标题了~应满含全部@Bean包的最底等第。

线程池的意思

为了减小频仍创造、销毁对象带来的性质消耗,大家可以选择对象池的技巧来兑现目的的复用。对象池提供了一种体制,它能够管理对象池中指标的生命周期,提供了收获和刑满释放解除劳教对象的格局,能够让客商端很有益于的采纳对象池中的对象。

Spring线程池结合非Spring托管Bean。

package com.chszs.thread;



public class PrintTask implements Runnable{

        String name;

        public PrintTask(String name){

                this.name = name;

        }

        @Override

        public void run() {

                System.out.println(name + " is running.");

                try{

                        Thread.sleep(5000);

                }catch(InterruptedException e){

                        e.printStackTrace();

                }

                System.out.println(name + " is running again.");

        }



}

 

pom引进重视

 <!-- FtpClient依赖包--> <dependency> <groupId>commons-net</groupId> <artifactId>commons-net</artifactId> <version>3.5</version> </dependency> <!-- 线程池--> <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.0</version> </dependency>

Spring-Config.xml

<beans xmlns="http://www.springframework.org/schema/beans"

        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 

        xmlns:context="http://www.springframework.org/schema/context"

        xsi:schemaLocation="http://www.springframework.org/schema/beans

        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd

        http://www.springframework.org/schema/context

        http://www.springframework.org/schema/context/spring-context-3.1.xsd">



        <bean id="taskExecutor" 

        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

                <property name="corePoolSize" value="5" />

                <property name="maxPoolSize" value="10" />

                <property name="WaitForTasksToCompleteOnShutdown" value="true" />

        </bean>

</beans>

 

package com.chszs;



import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;



import com.chszs.thread.PrintTask;



public class App1 {



        public static void main(String[] args) {

                ApplicationContext ctx = 

            new ClassPathXmlApplicationContext("resources/Spring-Config.xml");

                ThreadPoolTaskExecutor taskExecutor =

            (ThreadPoolTaskExecutor)ctx.getBean("taskExecutor");

                taskExecutor.execute(new PrintTask("Thread 1"));

                taskExecutor.execute(new PrintTask("Thread 2"));

                taskExecutor.execute(new PrintTask("Thread 3"));

                taskExecutor.execute(new PrintTask("Thread 4"));

                taskExecutor.execute(new PrintTask("Thread 5"));

                // 检查活动的线程,如果活动线程数为0则关闭线程池

                for(;;){

                        int count = taskExecutor.getActiveCount();

                        System.out.println("Active Threads : " + count);

                        try{

                                Thread.sleep(1000);

                        }catch(InterruptedException e){

                                e.printStackTrace();

                        }

                        if(count==0){

                                taskExecutor.shutdown();

                                break;

                        }

                }

        }



}

创办ftp配置消息

resources目录下创办ftp.properties布局文件,目录结构如下:

图片 1image.png

增加如下的安插新闻:

########### FTP用户名称 ###########ftp.userName=hrabbit########### FTP用户密码 ###########ftp.passWord=123456########### FTP主机IP ###########ftp.host=127.0.0.1########### FTP主机端口号 ###########ftp.port=21########### 保存根路径 ###########ftp.baseUrl=/

Spring结合Java线程

由此持续Thread制造一个简短的Java线程,然后利用@Component让Spring容器管理此线程,Bean的限制必得是prototype,由此各类诉求都会重回贰个新实例,运转每一个独立的线程。

package com.chszs.thread;



import org.springframework.stereotype.Component;

import org.springframework.context.annotation.Scope;



@Component

@Scope("prototype")

public class PrintThread extends Thread{

        @Override

        public void run(){

                System.out.println(getName() + " is running.");

                try{

                        Thread.sleep(5000);

                }catch(InterruptedException e){

                        e.printStackTrace();

                }

                System.out.println(getName() + " is running again.");

        }

}

 

package com.chszs;

import org.springframework.context.ApplicationContext;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;



import com.chszs.config.AppConfig;

import com.chszs.thread.PrintThread;



public class App {

        public static void main(String[] args){

                ApplicationContext ctx = 

            new AnnotationConfigApplicationContext(AppConfig.class);

                PrintThread printThread1 = (PrintThread)ctx.getBean("printThread");

                printThread1.setName("Thread 1");



                PrintThread printThread2 = (PrintThread)ctx.getBean("printThread");

                printThread2.setName("Thread 2");



                PrintThread printThread3 = (PrintThread)ctx.getBean("printThread");

                printThread3.setName("Thread 3");



                PrintThread printThread4 = (PrintThread)ctx.getBean("printThread");

                printThread4.setName("Thread 4");



                PrintThread printThread5 = (PrintThread)ctx.getBean("printThread");

                printThread5.setName("Thread 5");



                printThread1.start();

                printThread2.start();

                printThread3.start();

                printThread4.start();

                printThread5.start();

        }

}

 

package com.chszs.config;



import org.springframework.context.annotation.ComponentScan;

import org.springframework.context.annotation.Configuration;



@Configuration

@ComponentScan(basePackages="com.chszs.thread")

public class AppConfig {

}

 锁信任的jar

org.springframework.aop-3.1.3.RELEASE.jar
org.springframework.asm-3.1.3.RELEASE.jar
org.springframework.beans-3.1.3.RELEASE.jar
org.springframework.context-3.1.3.RELEASE.jar
org.springframework.core-3.1.3.RELEASE.jar
org.springframework.expression-3.1.3.RELEASE.jar
commons-logging.jar
aopalliance-1.0.jar
asm-3.3.1.jar
cglib-2.2.2.jar

另外附上线程池属性,效用等你打通,应接拍砖~

protected edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue createQueue(int queueCapacity)

//Create the BlockingQueue to use for the ThreadPoolExecutor.

 void   destroy()

//Calls shutdown when the BeanFactory destroys the task executor instance.

 void   execute(Runnable task)

//Implementation of both the JSR-166 backport Executor interface and the Spring 

TaskExecutor interface, delegating to the ThreadPoolExecutor instance.

 int    getActiveCount()

//Return the number of currently active threads.

 int    getCorePoolSize()

//Return the ThreadPoolExecutor's core pool size.

 int    getKeepAliveSeconds()

//Return the ThreadPoolExecutor's keep-alive seconds.

 int    getMaxPoolSize()

//Return the ThreadPoolExecutor's maximum pool size.

 int    getPoolSize()

//Return the current pool size.

 edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor  getThreadPoolExecutor()

//Return the underlying ThreadPoolExecutor for native access.

 void   initialize()

//Creates the BlockingQueue and the ThreadPoolExecutor.

 boolean    prefersShortLivedTasks()

//This task executor prefers short-lived work units.

 void   setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut)

//Specify whether to allow core threads to time out.

 void   setBeanName(String name)

//Set the name of the bean in the bean factory that created this bean.

 void   setCorePoolSize(int corePoolSize)

//Set the ThreadPoolExecutor's core pool size.

 void   setKeepAliveSeconds(int keepAliveSeconds)

//Set the ThreadPoolExecutor's keep-alive seconds.

 void   setMaxPoolSize(int maxPoolSize)

//Set the ThreadPoolExecutor's maximum pool size.

 void   setQueueCapacity(int queueCapacity)

//Set the capacity for the ThreadPoolExecutor's BlockingQueue.

 void   setRejectedExecutionHandler(edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler rejectedExecutionHandler)

//Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.

 void   setThreadFactory(edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory threadFactory)

//Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.

 void   setThreadNamePrefix(String threadNamePrefix)

//Specify the prefix to use for the names of newly created threads.

 void   setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown)

//Set whether to wait for scheduled tasks to complete on shutdown.

 void   shutdown()

//Perform a shutdown on the ThreadPoolExecutor.

 

 

 

 

创建FTPProperties.java配置文件

加载配置内容到Spring中,配置新闻为主延用笔者的就能够。

/** * FTP的配置信息 * @Auther: hrabbit * @Date: 2018-12-03 2:06 PM * @Description: */@Data@Component@PropertySource("classpath:ftp.properties")@ConfigurationProperties(prefix = "ftp")public class FTPProperties { private String username; private String password; private String host; private Integer port; private String baseUrl; private Integer passiveMode = FTP.BINARY_FILE_TYPE; private String encoding="UTF-8"; private int clientTimeout=120000; private int bufferSize; private int transferFileType=FTP.BINARY_FILE_TYPE; private boolean renameUploaded; private int retryTime;}

创建FTPClientPool线程池

/** * 自定义实现ftp连接池 * @Auther: hrabbit * @Date: 2018-12-03 3:40 PM * @Description: */@Slf4j@SuppressWarningspublic class FTPClientPool implements ObjectPool<FTPClient> { private static final int DEFAULT_POOL_SIZE = 10; public BlockingQueue<FTPClient> blockingQueue; private FTPClientFactory factory; public FTPClientPool(FTPClientFactory factory) throws Exception { this(DEFAULT_POOL_SIZE, factory); } public FTPClientPool(int poolSize, FTPClientFactory factory) throws Exception { this.factory = factory; this.blockingQueue = new ArrayBlockingQueue<FTPClient>; initPool; } /** * 初始化连接池 * @param maxPoolSize * 最大连接数 * @throws Exception */ private void initPool(int maxPoolSize) throws Exception { int count = 0; while(count < maxPoolSize) { this.addObject(); count++; } } /** * 从连接池中获取对象 */ @Override public FTPClient borrowObject() throws Exception { FTPClient client = blockingQueue.take(); if(client == null) { client = factory.makeObject(); } else if(!factory.validateObject { invalidateObject; client = factory.makeObject(); } return client; } /** * 返还一个对象 */ @Override public void returnObject(FTPClient client) throws Exception { if ((client != null) && !blockingQueue.offer(client,2,TimeUnit.MINUTES)) { try { factory.destroyObject; } catch (Exception e) { throw e; } } } /** * 移除无效的对象 */ @Override public void invalidateObject(FTPClient client) throws Exception { blockingQueue.remove; } /** * 增加一个新的链接,超时失效 */ @Override public void addObject() throws Exception { blockingQueue.offer(factory.makeObject(), 2, TimeUnit.MINUTES); } /** * 重新连接 */ public FTPClient reconnect() throws Exception { return factory.makeObject(); } /** * 获取空闲链接数 */ @Override public int getNumIdle() { return blockingQueue.size(); } /** * 获取正在被使用的链接数 */ @Override public int getNumActive() { return DEFAULT_POOL_SIZE - getNumIdle(); } @Override public void clear() throws Exception { } /** * 关闭连接池 */ @Override public void close() { try { while(blockingQueue.iterator().hasNext { FTPClient client = blockingQueue.take(); factory.destroyObject; } } catch(Exception e) { log.error("close ftp client pool failed...{}", e); } } /** * 增加一个新的链接,超时失效 */ public void addObject(FTPClient ftpClient) throws Exception { blockingQueue.put(ftpClient); }}

创办一个FTPClientFactory工厂类

创建FTPClientFactory实现PoolableObjectFactory的接口,FTPClient工厂类,通过FTPClient工厂提供FTPClient实例的创造和销毁

/** * FTPClient 工厂 * @Auther: hrabbit * @Date: 2018-12-03 3:41 PM * @Description: */@Slf4j@SuppressWarningspublic class FTPClientFactory implements PoolableObjectFactory<FTPClient> { private FTPProperties ftpProperties; public FTPClientFactory(FTPProperties ftpProperties) { this.ftpProperties = ftpProperties; } @Override public FTPClient makeObject() throws Exception { FTPClient ftpClient = new FTPClient(); ftpClient.setControlEncoding(ftpProperties.getEncoding; ftpClient.setConnectTimeout(ftpProperties.getClientTimeout; try { ftpClient.connect(ftpProperties.getHost(), ftpProperties.getPort; int reply = ftpClient.getReplyCode(); if (!FTPReply.isPositiveCompletion { ftpClient.disconnect(); log.warn("FTPServer refused connection"); return null; } boolean result = ftpClient.login(ftpProperties.getUsername(), ftpProperties.getPassword; ftpClient.setFileType(ftpProperties.getTransferFileType; if  { log.warn("ftpClient login failed... username is {}", ftpProperties.getUsername; } } catch (Exception e) { log.error("create ftp connection failed...{}", e); throw e; } return ftpClient; } @Override public void destroyObject(FTPClient ftpClient) throws Exception { try { if(ftpClient != null && ftpClient.isConnected { ftpClient.logout(); } } catch (Exception e) { log.error("ftp client logout failed...{}", e); throw e; } finally { if(ftpClient != null) { ftpClient.disconnect(); } } } @Override public boolean validateObject(FTPClient ftpClient) { try { return ftpClient.sendNoOp(); } catch (Exception e) { log.error("Failed to validate client: {}"); } return false; } @Override public void activateObject(FTPClient obj) throws Exception { //Do nothing } @Override public void passivateObject(FTPClient obj) throws Exception { //Do nothing }}

创建FTPUtils.java的工具类

FTPUtils.java中封装了上传、下载等办法,在类型运维的时候,在@PostConstruct讲解的功用下通超过实际行init()的方法,创建FTPClientFactory工厂中,并开始化了FTPClientPool线程池,那样每一次调用方法的时候,都直接从FTPClientPool中抽取贰个FTPClient对象

/** * @Auther: hrabbit * @Date: 2018-12-03 3:47 PM * @Description: */@Slf4j@Componentpublic class FTPUtils { /** * FTP的连接池 */ @Autowired public static FTPClientPool ftpClientPool; /** * FTPClient对象 */ public static FTPClient ftpClient; private static FTPUtils ftpUtils; @Autowired private FTPProperties ftpProperties; /** * 初始化设置 * @return */ @PostConstruct public boolean init() { FTPClientFactory factory = new FTPClientFactory(ftpProperties); ftpUtils = this; try { ftpClientPool = new FTPClientPool; } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 获取连接对象 * @return * @throws Exception */ public static FTPClient getFTPClient() throws Exception { //初始化的时候从队列中取出一个连接 if (ftpClient==null) { synchronized (ftpClientPool) { ftpClient = ftpClientPool.borrowObject(); } } return ftpClient; } /** * 当前命令执行完成命令完成 * @throws IOException */ public void complete() throws IOException { ftpClient.completePendingCommand(); } /** * 当前线程任务处理完成,加入到队列的最后 * @return */ public void disconnect() throws Exception { ftpClientPool.addObject(ftpClient); } /** * Description: 向FTP服务器上传文件 * * @Version1.0 * @param remoteFile * 上传到FTP服务器上的文件名 * @param input * 本地文件流 * @return 成功返回true,否则返回false */ public static boolean uploadFile(String remoteFile, InputStream input) { boolean result = false; try { getFTPClient(); ftpClient.enterLocalPassiveMode(); result = ftpClient.storeFile(remoteFile, input); input.close(); ftpClient.disconnect(); } catch (Exception e) { e.printStackTrace(); } return result; } /** * Description: 向FTP服务器上传文件 * * @Version1.0 * @param remoteFile * 上传到FTP服务器上的文件名 * @param localFile * 本地文件 * @return 成功返回true,否则返回false */ public static boolean uploadFile(String remoteFile, String localFile){ FileInputStream input = null; try { input = new FileInputStream(new File(localFile)); } catch (FileNotFoundException e) { e.printStackTrace(); } return uploadFile(remoteFile, input); } /** * 拷贝文件 * @param fromFile * @param toFile * @return * @throws Exception */ public boolean copyFile(String fromFile, String toFile) throws Exception { InputStream in=getFileInputStream; getFTPClient(); boolean flag = ftpClient.storeFile(toFile, in); in.close(); return flag; } /** * 获取文件输入流 * @param fileName * @return * @throws IOException */ public static InputStream getFileInputStream(String fileName) throws Exception { ByteArrayOutputStream fos=new ByteArrayOutputStream(); getFTPClient(); ftpClient.retrieveFile(fileName, fos); ByteArrayInputStream in=new ByteArrayInputStream(fos.toByteArray; fos.close(); return in; } /** * Description: 从FTP服务器下载文件 * * @Version1.0 * @return */ public static boolean downFile(String remoteFile, String localFile){ boolean result = false; try { getFTPClient(); OutputStream os = new FileOutputStream(localFile); ftpClient.retrieveFile(remoteFile, os); ftpClient.logout(); ftpClient.disconnect(); result = true; } catch (Exception e) { e.printStackTrace(); } finally { try { } catch (Exception e) { e.printStackTrace(); } } return result; } /** * 从ftp中获取文件流 * @param filePath * @return * @throws Exception */ public static InputStream getInputStream(String filePath) throws Exception { getFTPClient(); InputStream inputStream = ftpClient.retrieveFileStream; return inputStream; } /** * ftp中文件重命名 * @param fromFile * @param toFile * @return * @throws Exception */ public boolean rename(String fromFile,String toFile) throws Exception { getFTPClient(); boolean result = ftpClient.rename(fromFile,toFile); return result; } /** * 获取ftp目录下的所有文件 * @param dir * @return */ public FTPFile[] getFiles(String dir) throws Exception { getFTPClient(); FTPFile[] files = new FTPFile[0]; try { files = ftpClient.listFiles; }catch (Throwable thr){ thr.printStackTrace(); } return files; } /** * 获取ftp目录下的某种类型的文件 * @param dir * @param filter * @return */ public FTPFile[] getFiles(String dir, FTPFileFilter filter) throws Exception { getFTPClient(); FTPFile[] files = new FTPFile[0]; try { files = ftpClient.listFiles(dir, filter); }catch (Throwable thr){ thr.printStackTrace(); } return files; } /** * 创建文件夹 * @param remoteDir * @return 如果已经有这个文件夹返回false */ public boolean makeDirectory(String remoteDir) throws Exception { getFTPClient(); boolean result = false; try { result = ftpClient.makeDirectory(remoteDir); } catch (IOException e) { e.printStackTrace(); } return result; } public boolean mkdirs(String dir) throws Exception { boolean result = false; if (null == dir) { return result; } getFTPClient(); ftpClient.changeWorkingDirectory; StringTokenizer dirs = new StringTokenizer; String temp = null; while (dirs.hasMoreElements { temp = dirs.nextElement().toString(); //创建目录 ftpClient.makeDirectory; //进入目录 ftpClient.changeWorkingDirectory; result = true; } ftpClient.changeWorkingDirectory; return result; }}

创建FtpClientTest.java测试类

上传一张图纸到FTP服务器,并将文件重新命名称为hrabbit.jpg,代码如下:

/** * FtpClient测试 * @Auther: hrabbit * @Date: 2018-12-21 9:14 PM * @Description: */@RunWith(SpringRunner.class)@SpringBootTestpublic class FtpClientTest { /** * 测试上传 */ @Test public void uploadFile(){ boolean flag = FTPUtils.uploadFile("hrabbit.jpg", "/Users/mrotaku/Downloads/klklklkl_4x.jpg"); Assert.assertEquals(true, flag); }}

前后相继完善运转,这时候大家查阅大家的FTP服务器,http://localhost:8866/hrabbit.jpg

图片 2image.png

码云地址:https://gitee.com/hrabbit/hrabbit-admin简书地址:
https://www.jianshu.com/u/c0657ceed065

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图
Copyright @ 2010-2019 韦德国际手机网站 版权所有