但是通常来说,解析和持久化可以分别用单线程的方式执行.
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetAddress; import java.net.Socket; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; public class Probe { private static final BlockingQueue<Task> CONNECTLIST = new LinkedBlockingQueue<Task>(); private static final BlockingQueue<Task> PARSELIST = new LinkedBlockingQueue<Task>(); private static final BlockingQueue<Task> PERSISTENCELIST = new LinkedBlockingQueue<Task>(); private static ExecutorService CONNECTTHREADPOOL; private static ExecutorService PARSETHREADPOOL; private static ExecutorService PERSISTENCETHREADPOOL; private static final List<String> DOMAINLIST = new CopyOnWriteArrayList<>(); static { CONNECTTHREADPOOL = Executors.newFixedThreadPool(200); PARSETHREADPOOL = Executors.newSingleThreadExecutor(); PERSISTENCETHREADPOOL = Executors.newFixedThreadPool(1); DOMAINLIST.add("域名"); } public static void main(String args[]) throws Exception { long start = System.currentTimeMillis(); CONNECTLIST.put(new Task("域名", 80, "/static/index.html")); for (int i = 0; i < 600; i++) { CONNECTTHREADPOOL.submit(new ConnectHandler(CONNECTLIST, PARSELIST)); } PARSETHREADPOOL.submit(new ParseHandler(CONNECTLIST, PARSELIST, PERSISTENCELIST, DOMAINLIST)); PERSISTENCETHREADPOOL.submit(new PersistenceHandler(PERSISTENCELIST)); while (true) { Thread.sleep(1000); long end = System.currentTimeMillis(); float interval = ((end - start) / 1000); int connectTotal = ConnectHandler.GETCOUNT(); int parseTotal = ParseHandler.GETCOUNT(); int persistenceTotal = PersistenceHandler.GETCOUNT(); int connectps = Math.round(connectTotal / interval); int parseps = Math.round(parseTotal / interval); int persistenceps = Math.round(persistenceTotal / interval); System.out.print("\r连接总数:" + connectTotal + " \t每秒连接:" + connectps + "\t连接队列剩余:" + CONNECTLIST.size() + " \t解析总数:" + parseTotal + " \t每秒解析:" + parseps + "\t解析队列剩余:" + PARSELIST.size() + " \t持久化总数:" + persistenceTotal + " \t每秒持久化:" + persistenceps + "\t持久化队列剩余:" + PERSISTENCELIST.size()); } } } class Task { public Task() { } public void init(String host, int port, String path) { this.setCurrentPath(path); this.host = host; this.port = port; } public Task(String host, int port, String path) { init(host, port, path); } private String host; private int port; private String currentPath; private long taskTime; private String type; private String content; private int state; public int getState() { return state; } public void setState(int state) { this.state = state; } public String getCurrentPath() { return currentPath; } public void setCurrentPath(String currentPath) { this.currentPath = currentPath; this.type = currentPath.substring(currentPath.indexOf(".") + 1, currentPath.indexOf("?") != -1 ? currentPath.indexOf("?") : currentPath.length()); } public long getTaskTime() { return taskTime; } public void setTaskTime(long taskTime) { this.taskTime = taskTime; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getHost() { return host; } public int getPort() { return port; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } } class ParseHandler implements Runnable { private static Set<String> SET = new ConcurrentSkipListSet<String>(); public static int GETCOUNT() { return COUNT.get(); } private static final AtomicInteger COUNT = new AtomicInteger(); private BlockingQueue<Task> connectlist; private BlockingQueue<Task> parselist; private BlockingQueue<Task> persistencelist; List<String> domainlist; |