import java.io.*; import java.net.*; import java.sql.*; import java.util.*; import java.util.zip.*; /** * Класс Agent в автоматическом режиме должен проверять зарегистрированные в таблице 'dir_index' * пути на работоспособность и отыскивать новые, ранее не индексированные директории.<p> * Дополнительной задачей является сбор из файлов, содержащих текст * (*.txt *.html *.htm *.nfo [*.mp3 *.zip *.gz]) информацию, выделение из неё смысловой части * и внесение этой части в таблицу ассоциаций 'аssociations' в соответствием с названиями * директорий из пути этого файла.<p> * В работе используется ДВ MySQL, формат указанных таблиц: <pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;"> CREATE TABLE servers ( UID int(11) NOT NULL auto_increment, host varchar(255), port int(11) NOT NULL default '21', user varchar(255) NOT NULL default 'anonymous', pass varchar(255) NOT NULL default 'ymi@FTPSearch', about varchar(255), indexStart int(11), indexEnd int(11), lastAttempt int(11), online int(1), PRIMARY KEY (UID) ) TYPE=MyISAM; // UID - уникальный индетификатор // host, port - хост и порт сервера // user, pass - имя и пароль к аккаунту // about - дополнительная информация о сервере и его содержимом // indexStart, indexEnd - время последнего начала индекса и его конца // lastAttempt - последняя попытка доступа к серверу // online - состояние доступности сервера в сети на данный момент </pre> <p> <pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;"> CREATE TABLE dir_index_[serverID] ( UID int(11) NOT NULL auto_increment, serverID int(11), serverText text, path text, crc varchar(32), name varchar(255), level int(11), time int(11), filenameList text, filesizeList text, dirSize varchar(255), dirText text, PRIMARY KEY (UID) ) TYPE=MyISAM; // serverID - ID сервера, таблица создаётся автоматически // UID - уникальный индетификатор директории // serverText - строка доступа к серверу следующего типа: ftp://name@host:port // path - путь до директории от корня сервера, вместе с serverText создаёт абсолютный URL на эту директорию // name - название директории // level - уровень вложености; 0 - для корневой директории, далее - по нарастающей // time - время данного сканирования // filenameList - список названий файлов, содержащихся в данной директории; названия идут через символ '\n' // filesizeList - список размеров файлов, дополняет список filenameList // dirSize - сумма размеров всех файлов, содержащихся в директории // dirText - то же самое, что заносится в ассоциации </pre> <p> <pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;"> CREATE TABLE small_texts_[serverID] ( UID int(11) NOT NULL auto_increment, crc VARCHAR(32), name varchar(255), content text, PRIMARY KEY (UID) ) TYPE=MyISAM; // UID - уникальный индетификатор // crc - CRC32 для path данного текстового файла // name - имя файла // content - содержимое файла </pre> <p> <pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;"> CREATE TABLE associations ( UID int(11) NOT NULL auto_increment, crc VARCHAR(32), AgentID int(11), content text, range1 varchar(255), range2 varchar(255), range3 varchar(255), PRIMARY KEY (UID) ) TYPE=MyISAM; // UID - уникальный индетификатор // crc - CRC32 код от пути директории, где был текст // AgentID - UIN агента, установившего ассоциацию // content - текстовое содержимое ассоциации // range1, range2, range3 - куски текста (слова), с которыми ассоциирован данный текст </pre> <p> <pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;"> CREATE TABLE user_associations ( UID int(11) NOT NULL auto_increment, time int(11), request varchar(255), content varchar(255), crc varchar(32), PRIMARY KEY (UID), UNIQUE KEY crc (crc) ) TYPE=MyISAM; // UID - уникальный индетификатор // time - время создания ассоциации // request - регулярное выражение по мотивам имени директории // content - текстовое содержимое ассоциации // crc - CRC32 код от request и content соединённых через символ "/" для предотвращения повторов </pre> */ class Agent extends Thread { Spy spyPool[]; Connection c; /** * Запуск из коммандной строки. * * <p>Синтаксис запуска: * <br><b>Agent [max Spyes <i>(5)</i>] [DBHost <i>(localhost)</i>] [DB <i>(FTPSearch)</i>] [user <i>(root)</i>] [pass]</b><br><br> * <br> <b>max Spyes</b> - задаёт максимальное количество одновременно работающих "шпионов" * <br> <b>DBHost</b> - хост сервера MySQL * <br> <b>user</b> - имя вашего пользователя в базе * <br> <b>pass</b> - пароль вашего пользователя в базе * <br> <b>DB</b> - имя рабочей базы данных */ public static void main(String args[]) { if(args.length < 4) { System.out.println("Wrong number of params. Use:\nAgent [max_Spyes (5)] [DBHost (localhost)] [DB (FTPSearch)] [user (root)] [pass]"); return; } String pass = ""; if(args.length > 4) pass = args[4]; try { int maxSpyes = Integer.parseInt(args[0]); new Agent(maxSpyes, args[1], args[3], pass, args[2]); } catch(Exception e){System.out.println("Wrong params: "+e);} } /** * Запуск агента индексирования. * * @param max_Spyes задаёт максимальное количество одновременно работающих "шпионов"<br> * @param DBHost хост сервера MySQL<br> * @param user имя вашего пользователя в базе<br> * @param pass пароль вашего пользователя в базе<br> * @param DB имя рабочей базы данных<br> * @exception Exception если проблемы применения параметров * @see Agent#run() */ public Agent(int maxSpyes, String DBHost, String user, String pass, String DB) throws Exception { Class.forName("org.gjt.mm.mysql.Driver").newInstance(); c = DriverManager.getConnection("jdbc:mysql://"+DBHost+"/"+DB+"?user="+user+"&password="+pass); spyPool = new Spy[maxSpyes]; for(int i = 0; i < maxSpyes; i++) spyPool[i] = null; setPriority(NORM_PRIORITY); start(); } /** * проверка статуса серверов online/offline * @see Spy */ protected void startScanners() { try { Statement stmt = c.createStatement(); ResultSet rs = stmt.executeQuery("SELECT host, port FROM servers WHERE UID > 0"); while(rs.next()) new Scaner(rs.getString(1),rs.getInt(2)); rs.close(); stmt.close(); } catch(Exception e){} } /** * Тело агента индексирования. * Эта функция запускает "шпионов" и следит за тем, * чтобы индексирующие шпионы не "подвисали". */ public void run() { int i; boolean spyStarted; while(true) { // каждый раз вначале запускаем сканирование директорий startScanners(); // потом обходим весь пул "шпионов" spyStarted = false; for(i = 0; i < spyPool.length; i++) if((spyPool[i] == null) && !spyStarted) // и запускаем одного нового шпиона, если в пуле есть свободное место { spyPool[i] = new Spy(i); spyStarted = true; } else if(spyPool[i] != null) // если же шпион уже вот как 5 минут бездействует, то производим его рестарт if(spyPool[i].lastEvent < (new java.util.Date()).getTime() - 300000) spyPool[i].restart(); System.gc(); try{sleep(300000);} catch(Exception e){} } } /** Сканер активности сервера с определённым IP на определённом порту. */ class Scaner extends Thread { String IP; int port; public Scaner(String IP, int port) { this.IP = IP; this.port = port; start(); } public void run() { String query = ""; try { Socket s = new Socket(IP, port); s.close(); query = "UPDATE servers SET online=1 WHERE host='"+IP+"' AND port="+port; } catch(Exception e) { query = "UPDATE servers SET online=0 WHERE host='"+IP+"' AND port="+port; } try { Statement stmt = c.createStatement(); stmt.executeQuery(query); stmt.close(); } catch(Exception e){} } } /** * Индексирующий "шпион": рекурсивно обходит всю файловую систему сервера * и заносит данные в БД. */ protected class Spy extends Thread { /** время последнего события индексатора (миллисекунды) */ public long lastEvent; protected Socket socket1, socket2; protected int serverID, slotNum, UIN, port, reconnections; protected String host, user, pass, serverText, lastDir; protected FileOutputStream log; protected boolean notFromHome; /** * в параметрах инициализайии передаётся номер слота * в пуле "шпионов" класса Agent */ public Spy(int slotNum) { this.slotNum = slotNum; eventTime(); UIN = (int)(lastEvent/1000); socket1 = null; socket2 = null; serverID = 0; eventTime(); setDaemon(false); setPriority(NORM_PRIORITY); start(); } /** пишет лог событий */ synchronized protected void logOut(String str) { if(!str.endsWith("\n")) str = str+"\r\n"; //System.out.print(str); try{log.write(str.getBytes());} catch(Exception e){} } /** обновляет время последнего соединения с интиресующим сервером */ protected void eventTime() {lastEvent = (new java.util.Date()).getTime();} /** * простенькая функция рестарта, * принцип работы: закрываем активные сокеты и переоткрываем их */ public void restart() { eventTime(); try{socket1.close();} catch(Exception e){} try{socket2.close();} catch(Exception e){} reconnect(); } /** * Простой обработчик InputStream. Предпринимает попытку прочесть 64кб из потока, * после чего из прочитанных данных создаёт строку. * * @return строка, прочитанная из InputStream * @exception Exception на случай ошибки потока или прочтения нуля байт и попытки из этого 0 создать новую строку */ protected String read(InputStream is) throws Exception { byte buf[] = new byte[64*1024]; int r = is.read(buf); //logOut("< "+(new String(buf, 0, r))); return new String(buf, 0, r); } /** * Простой обработчик InputStream. Читает ровно lng байт из потока и возвращает их. * * @return lng байт из потока * @exception Exception на случай ошибки потока или задания неверного значения lng */ protected byte[] read(InputStream is, int lng) { byte buf[] = new byte[lng]; int off = 0, r = 1; try { while((r > 0)&&(off < lng)) { r = is.read(buf, off, buf.length-off); if(r > 0) { off += r; if(buf[off-1] == 0) r = 0; } } } catch(Exception e){} if(off > 0) { byte b[] = new byte[off]; for(r = 0; r < off; r++) b[r] = buf[r]; buf = b; System.gc(); } return buf; } /** пишет запрос и тут же получает ответ */ protected void write(OutputStream os, String str) throws Exception { eventTime(); os.write(str.getBytes()); //logOut("> "+str); } /** * подготовка к запуску рекурсивного обхода: * Удаление старых записей, выбор сервера и запуск обработчика. */ public void run() { try { // удаляем все серверы, к которым небыло доступа в течении 15 суток Statement stmt = c.createStatement(); ResultSet rs = stmt.executeQuery("SELECT UID FROM servers WHERE indexStart < UNIX_TIMESTAMP()-1296000"); while(rs.next()) { try { Statement stmt2 = c.createStatement(); stmt2.executeQuery("DROP TABLE `dir_index_"+rs.getInt(1)+"`"); stmt2.close(); } catch(Exception e){} try { Statement stmt2 = c.createStatement(); stmt2.executeQuery("DROP TABLE `dir_temp_index_"+rs.getInt(1)+"`"); stmt2.close(); } catch(Exception e){} try { Statement stmt2 = c.createStatement(); stmt2.executeQuery("DROP TABLE `small_texts_"+serverID+"`"); stmt2.close(); } catch(Exception e){} try { Statement stmt2 = c.createStatement(); stmt2.executeQuery("DROP TABLE `small_texts_temp_"+serverID+"`"); stmt2.close(); } catch(Exception e){} } stmt.close(); // создаём список серверов, что уже обрабатываются String servers = "0"; for(int i = 0; i < spyPool.length; i++) if(spyPool[i] != null) { try{servers = servers + "," + spyPool[i].serverID;} catch(Exception e){} } // берём сервер, который не проверяли уже в течении 8 часов и который по результатам работы сканера онлайн, так же сервер не должен уже находиться в обработке stmt = c.createStatement(); rs = stmt.executeQuery("SELECT UID, host, port, user, pass, indexStart, indexEnd FROM servers WHERE ((((lastAttempt < UNIX_TIMESTAMP()-600)AND(indexStart < UNIX_TIMESTAMP()-600))AND(indexEnd < UNIX_TIMESTAMP()-57600))AND(UID NOT IN("+servers+")))AND(online = 1) ORDER BY lastAttempt"); if(!rs.next()) // если такого сервера нет - то чистка и выход { rs.close(); stmt.close(); eventTime(); spyPool[slotNum] = null; return; } // заполняем данные о сервере serverID = rs.getInt(1); host = rs.getString(2); port = rs.getInt(3); user = rs.getString(4); pass = rs.getString(5); notFromHome = rs.getInt(6) < rs.getInt(7); rs.close(); stmt.close(); // обновляем время последней попытки доступа stmt = c.createStatement(); stmt.executeQuery("UPDATE servers SET lastAttempt = UNIX_TIMESTAMP() WHERE UID="+serverID); stmt.close(); // если в прошлый раз индекс был не доведён до конца, // то пытаемся найти на чём остановились и продолжить if(notFromHome) { try { stmt = c.createStatement(); rs = stmt.executeQuery("SELECT path FROM dir_temp_index_"+serverID+" WHERE 1 ORDER BY -time LIMIT 0,1"); if(rs.next()) lastDir = rs.getString(1); else notFromHome = false; rs.close(); stmt.close(); } catch(Exception e){notFromHome = false;} } if(!notFromHome) { // если есть прошлые темповые таблицы - удаляем их try { stmt = c.createStatement(); stmt.executeQuery("DROP TABLE `dir_temp_index_"+serverID+"`"); stmt.close(); } catch(Exception e){} try { stmt = c.createStatement(); stmt.executeQuery("DROP TABLE `small_texts_temp_"+serverID+"`"); stmt.close(); } catch(Exception e){} // и создаём новые темповые таблицы try { stmt = c.createStatement(); stmt.executeQuery("CREATE TABLE dir_temp_index_"+serverID+"(UID int(11) NOT NULL auto_increment,serverID int(11),serverText text,path text,crc varchar(32),name varchar(255),level int(11),time int(11),filenameList text,filesizeList text,dirSize varchar(255),dirText text,PRIMARY KEY (UID))TYPE=MyISAM;"); stmt.close(); } catch(Exception e){} try { stmt = c.createStatement(); stmt.executeQuery("CREATE TABLE small_texts_temp_"+serverID+" (UID int(11) NOT NULL auto_increment, crc VARCHAR(32), name varchar(255), content text, PRIMARY KEY (UID)) TYPE=MyISAM;"); stmt.close(); } catch(Exception e){} } // создаём serverText serverText = "ftp://" + host; if(port != 21) serverText = serverText + ":" + port; // открываем лог log = new FileOutputStream("logs/log."+serverID+".txt", false); logOut(""+UIN+"---------------------\r\n"+serverText); reconnections = 0; // и запуск рекурсивного обработчика директорий try { // попытка соединения с сервером, если неудачная - то чистим и уходим if(!reconnect()) { logOut("X connection to "+user+"@"+host+":"+port+" closed"); log.close(); eventTime(); spyPool[slotNum] = null; return; } reconnections = 0; // обновляем время старта индекса stmt = c.createStatement(); stmt.executeQuery("UPDATE servers SET indexStart = UNIX_TIMESTAMP() WHERE UID="+serverID); stmt.close(); // и пытаемся создать индекс if(!spider(0, getDir(), "/", new long[255])) { // если не получилось - чистим и уходим logOut("X connection to "+user+"@"+host+":"+port+" closed"); log.close(); eventTime(); spyPool[slotNum] = null; return; } // если был установлен флаг продолжения индекса, // но директория, с которой продолжать так и не была найдена, то // удаляем темповые таблицы, чистим и уходим if(notFromHome) { try { stmt = c.createStatement(); stmt.executeQuery("DROP TABLE `dir_temp_index_"+serverID+"`"); stmt.close(); } catch(Exception e){} try { stmt = c.createStatement(); stmt.executeQuery("DROP TABLE `small_texts_temp_"+serverID+"`"); stmt.close(); } catch(Exception e){} log.close(); eventTime(); spyPool[slotNum] = null; return; } // если здесь, то всё прошло успешно, о чём и сообщаем logOut("*> task "+user+"@"+host+":"+port+" complete"); // удаляем главную таблицу try { stmt = c.createStatement(); stmt.executeQuery("DROP TABLE `dir_index_"+serverID+"`"); stmt.close(); } catch(Exception e){} try { stmt = c.createStatement(); stmt.executeQuery("DROP TABLE `small_texts_"+serverID+"`"); stmt.close(); } catch(Exception e){} // и переименовываем темповые таблицы в главные stmt = c.createStatement(); stmt.executeQuery("ALTER TABLE `dir_temp_index_"+serverID+"` RENAME `dir_index_"+serverID+"`"); stmt.close(); stmt = c.createStatement(); stmt.executeQuery("ALTER TABLE `small_texts_temp_"+serverID+"` RENAME `small_texts_"+serverID+"`"); stmt.close(); // указываем, что индекс завершён stmt = c.createStatement(); stmt.executeQuery("UPDATE servers SET indexEnd = UNIX_TIMESTAMP() WHERE UID="+serverID); stmt.close(); // на всякий случай оптимизируем таблицы: stmt = c.createStatement(); stmt.executeQuery("OPTIMIZE TABLE `user_associations`, `associations`, `servers`, `dir_index_"+serverID+"`, `small_texts_"+serverID+"`"); stmt.close(); } catch(Exception e) {logOut("X connection to "+user+"@"+host+":"+port+" closed\r\n"+e);} log.close(); } catch(Exception e){} eventTime(); spyPool[slotNum] = null; } /** * Функция для записи в OutputStream комманды и получения ответа из InputStream. * * @param str строка комманды, которую нужно записать в OutputStream * @exception Exception на случай ошибок потоков или пустой строки. */ protected String write(String str) throws Exception { write(socket1.getOutputStream(), str); return read(socket1.getInputStream()); } /** 4 попытки каждые 10 секунд + 1 через 10 минут переконнектится к серверу */ protected boolean reconnect() { reconnections++; if(reconnections == 2) return false; eventTime(); for(int i = 0; i < 4; i++) { if(login()) return true; logOut("[] sleep 10 sec."); try{sleep(10000);} catch(Exception e){}; } logOut("[] sleep 10 min."); try{sleep(600000);} catch(Exception e){}; return login(); } /** * Вход на сервер. * Неудачей соединения с сервером считается обрыв связи и непринятые user/pass. * При удачном соединении производится попытка перейти в корень сервера (CD /). */ protected boolean login() { String dt = ""; int h = Calendar.getInstance().get(Calendar.HOUR_OF_DAY); int m = Calendar.getInstance().get(Calendar.MINUTE); if(h < 10) dt = "0"+h; else dt = ""+h; dt = dt+":"; if(m < 10) dt = dt+"0"+m; else dt = dt+m; logOut("** ["+dt+"]"+" connect to "+user+"@"+host+":"+port); try { String reply; socket1 = new Socket(); socket1.connect(new InetSocketAddress(host, port), 30000); InputStream is = socket1.getInputStream(); OutputStream os = socket1.getOutputStream(); // read server "hello" reply = write("NOOP\r\n"); if(reply.indexOf("220") == 0) if(reply.indexOf("\n200") < 0) read(is); // send user reply = write("USER "+user+"\r\n"); if((reply.indexOf("4") == 0)||(reply.indexOf("5") == 0)) { logOut("X fail to connect to "+user+"@"+host+":"+port); return false; } // send pass reply = write("PASS "+pass+"\r\n"); if((reply.indexOf("4") == 0)||(reply.indexOf("5") == 0)) { logOut("X fail to connect to "+user+"@"+host+":"+port); return false; } // set REPRESENTATION TYPE as ASCII reply = write("TYPE A\r\n"); return true; } catch(Exception e){} logOut("X fail to connect to "+user+"@"+host+":"+port); return false; } /** Получение коммандой PWD текущей директории сервера. */ protected String getDir() throws Exception { String dir = write("PWD\r\n"); dir = dir.substring(dir.indexOf('"')+1); return dir.substring(0, dir.indexOf('"')); } /** * Получение коммандой PASV сокета пассивного соединения или null, * если не поддерживается. */ protected Socket getPasv() { try { String reply = write("PASV\r\n"); while(reply.indexOf("2") != 0) { sleep(1000); reply = write("PASV\r\n"); } int s = reply.indexOf("(")+1, e = reply.indexOf(")", s); reply = reply.substring(s, e); s = reply.indexOf(","); String host = reply.substring(0, s); e = reply.indexOf(",", s+1); host = host+"."+reply.substring(s+1, e); s = reply.indexOf(",", e+1); host = host+"."+reply.substring(e+1, s); e = reply.indexOf(",", s+1); host = host+"."+reply.substring(s+1, e); s = reply.indexOf(",", e+1); String port1 = reply.substring(e+1, s); e = reply.indexOf(",", s+1); String port2 = reply.substring(s+1); int port = (Integer.parseInt(port1) << 8) | Integer.parseInt(port2); Socket socket = new Socket(); socket.connect(new InetSocketAddress(host, port), 30000); return socket; } catch(Exception e){} return null; } /** * получение листинга директории в активном режиме * (это запасной вариант, на случай, если нельзя получить в пассивном) */ protected String getListActiveMode() { try { ServerSocket ss = new ServerSocket(0, 1, InetAddress.getLocalHost()); String reply = write("PORT "+ ss.getInetAddress().getHostAddress().replace('.',',')+ ","+((ss.getLocalPort()&0xff00)>>8)+ ","+(ss.getLocalPort()&0xff)+"\r\n"); if(reply.indexOf("4") == 0) return null; if(reply.indexOf("5") == 0) return null; reply = write("LIST\r\n"); socket2 = ss.accept(); byte buf[] = read(socket2.getInputStream(), 100*1024); int i = buf.length-1; while((i > 1)&&(buf[i] == 0)) i--; String answer = (new String(buf, 0, i)).trim()+"\n "; socket2.close(); ss.close(); if(reply.indexOf("\n1") < 0) if(reply.indexOf("\n2") < 0) if(reply.indexOf("\n3") < 0) if(reply.indexOf("\n4") < 0) read(socket1.getInputStream()); return answer; } catch(Exception ex){} return ""; } /** * Класс логически представляет файл на сервере, описание которого было полученно * коммандой LIST. * * <p> Класс создаётся в функции Agent.getList(). * @see Agent#getList() */ protected class FTPFile { /** названние файла */ public String name; /** размер файла */ public long size; /** является ли файл директорией */ public boolean isDir; /** Парсит строку, содержащую информацию о файле и заполняет поля класса. */ public FTPFile(String file) { if(file.trim().length() < 1) { isDir = true; name = "."; return; } file = file.trim(); isDir = file.charAt(0) == 'd'; if(!isDir) isDir = file.charAt(0) == 'D'; if(!isDir) { size = 0; int i = file.indexOf(" ", 31); if(i < 0) i = 43; try{size = Long.parseLong(file.substring(30, i).trim());} catch(Exception e1) { try{size = Long.parseLong(file.substring(30, 42).trim());} catch(Exception e2) { try{size = Long.parseLong(file.substring(30, 41).trim());} catch(Exception e3){} } } } else size = 0; name = file.substring(54).trim(); } } /** * Читает список текущей директории с помощью комманды LIST и возвращает список файлов. * @return возвращает список файлов в виде массива класса FTPFile * @see Agent.FTPFile */ synchronized protected FTPFile[] getList() throws Exception { String list, reply; socket2 = getPasv(); if(socket2 != null) { reply = write("LIST\r\n"); byte buf[] = read(socket2.getInputStream(), 100*1024); socket2.close(); int i = buf.length-1; while((i > 1)&&(buf[i] == 0)) i--; list = (new String(buf, 0, i)).trim()+"\n "; if(reply.indexOf("\n1") < 0) if(reply.indexOf("\n2") < 0) if(reply.indexOf("\n3") < 0) if(reply.indexOf("\n4") < 0) read(socket1.getInputStream()); } else list = getListActiveMode(); int i; int last = list.indexOf("\n"), c = 0; while(last >= 0) { c++; last = list.indexOf("\n", last+1); } if(c == 0) return null; FTPFile files[] = new FTPFile[c]; last = list.indexOf("\n"); c = 0; i = 0; while(last >= 0) { try{files[i] = new FTPFile(list.substring(c, last));} catch(Exception e){files[i] = new FTPFile("");} c = last; last = list.indexOf("\n", last+1); i++; } return files; } /** * простенькая функция, добавляющая c-like слэши для символов \,',",{,} * и так далее, используется для добавления строк в SQL */ protected String addSlashes(String text) { StringBuffer sb = new StringBuffer(text.length()*2); for(int i = 0; i < text.length(); i++) { if(text.charAt(i) == '"') sb.append("\\\""); else if(text.charAt(i) == '\'') sb.append("\\\'"); else if(text.charAt(i) == '\\') sb.append("\\\\"); else if(text.charAt(i) == '\n') sb.append("\\n"); else if(text.charAt(i) == '{') sb.append("-"); else if(text.charAt(i) == '}') sb.append("-"); else if(text.charAt(i) == '\r'); else sb.append(text.charAt(i)); } return sb.toString(); } /** * считает CRC32 код строки * (в этой версии программы используется уже не CRC32 алгоритм, * а Adler32 - как более быстрый, но название осталось старое) */ protected long getCRC32(String str) { Adler32 crc = new Adler32(); crc.update(str.getBytes()); return crc.getValue(); } /** * Функция рекурсивно перебирает директории и вносит их содержимое в БД. * * <p> Максмальная глубина просмотра директории - 255. * При завершении обработки директории функция пытается выполнить комманду CDUP. * При обрыве связи предпринимаются 4 попытки реконнекта, после чего - завершение функции. * * <p> При обработке файлов, выделяются файлы с расширениями .txt .diz .nfo * и если они менее 20 кб, то вызывается класс Associator, который их обрабатывает отдельно. * * @param level глубина просмотра от корневой директории * @param dir директория (полный путь, path), которую предлагается обработать серверу. * @param dirname имя директории, которую предлагается обработать серверу. * @param crc_list список CRC содержимого всех родительских директорий, используется, чтобы избежать зацикливания * @see Agent.Associator * @see Agent.Spy */ protected boolean spider(int level, String dir, String dirname, long[] crc_list) throws Exception { // максимальная глубина сканирования - 255 (это в основном связано с // реализайцией рекурсиии в Java - если поставить больше, то некоторые // JVM выдадут ошибку) if(level >= 255) return true; // попытка выбрать нужную директорию, если не получается (например нет прав доступа) // то возвращает ошибку String reply = write("CWD "+dir+"\r\n"); if(reply.indexOf("3") == 0) return false; if(reply.indexOf("4") == 0) return false; if(reply.indexOf("5") == 0) return false; dir = getDir(); if(dir.lastIndexOf("/") != dir.length()-1) dir = dir + "/"; // создание листа FTPFile list[] = getList(); if(list == null) if(reconnect()) list = getList(); if(list == null) return true; String fileList = "", fileSize = "", dirList = ""; long dirSize = 0; int textFiles = 0; for(int i = 0; i < list.length; i++) { if(!list[i].isDir) { dirList = dirList+list[i].name+"\n"+list[i].size+"\n"; fileList = fileList+list[i].name+"\n"; fileSize = fileSize+list[i].size+"\n"; dirSize += list[i].size; if(textFiles < 5) { String nm = list[i].name.toLowerCase(); if( nm.endsWith(".txt")&& ((list[i].size <= 8*1024)&& (list[i].size >= 20))) textFiles++; } } else dirList = dirList+list[i].name+"\nD\n"; } // проверка содержимого на повторы по CRC32 от содержимого: long crc = getCRC32(dirList); for(int i = 0; i < level; i++) if(crc_list[i] == crc) return true; // начало обрабработки директории crc_list[level] = crc; crc = getCRC32(dir); logOut("|> "+dir); // режим "не с начала" - доиндекс сервера if(notFromHome) { // проверяем, совпадает ли имя текущей директории с именем директории, // откуда нужно производить доиндекс notFromHome = !lastDir.equals(dir); // если не совпадает, то проверяем, есть ли в пути директории, откуда нужно // производить доиндекс пути до текущей директории, и если нету - выходим // (нужно чтобы не сканировать рекурсивно лишние директории) if(notFromHome) { String dr = "/"; String dn[] = lastDir.split("/"); if(dn.length < level) return true; for(int i = 0; i < level; i++) dr = dr + dn[i] + "/"; if(!dr.equals(dir)) return true; } else { // если начиная с этой директории нужно рпоизводить доиндекс, // то вначале удаляем эту директорию из БД, чтоб избежать повторов try { Statement stmt = c.createStatement(); stmt.executeQuery("DELETE FROM dir_temp_index_"+serverID+" WHERE path LIKE '"+addSlashes(dir)+"'"); stmt.close(); } catch(Exception e){} try { Statement stmt = c.createStatement(); stmt.executeQuery("DELETE FROM small_texts_temp_"+serverID+" WHERE crc="+crc+"'"); stmt.close(); } catch(Exception e){} } } // если уже режим индекса, а не поиска доиндекса, то добавляем директорию в лист if(!notFromHome) { try { Statement stmt = c.createStatement(); stmt.executeQuery("INSERT INTO dir_temp_index_"+serverID+" VALUES(NULL, "+serverID+", '"+serverText+"', '"+addSlashes(dir)+"', '"+crc+"', '"+addSlashes(dirname)+"', "+level+", UNIX_TIMESTAMP(), '"+addSlashes(fileList)+"', '"+addSlashes(fileSize)+"', '"+dirSize+"', '')"); stmt.close(); } catch(Exception e){return false;} } // обработка списка файлов и директорий for(int i = 0; i < list.length; i++) { if(list[i].isDir) // если директория { if(!list[i].name.equals(".")) if(!list[i].name.equals("..")) { // и если это не ссылка на родительскую или текущую, то // две попытки рекурсивного скана try { if(spider(level+1, dir+list[i].name, list[i].name, crc_list)) reconnections = 0; } catch(Exception e) { try { if(!reconnect()) return false; if(spider(level+1, dir+list[i].name, list[i].name, crc_list)) reconnections = 0; else return false; } catch(Exception ex){return false;} } } } else if(!notFromHome) { // если это файл и уже режим индекса и файл менее 8 кб и заканчивается на // .nfo или .diz, или он заканчивается на .txt, меньше 8 кб и таких файлов // в директории менее 5 штук, то запускаем ассоциатор String nm = list[i].name.toLowerCase(); if( (nm.endsWith(".diz") || nm.endsWith(".nfo"))&& ((list[i].size <= 8*1024)&& (list[i].size >= 20))) new Associator(list[i].name, dir, crc, list[i].size); else if(((textFiles < 5)&& (nm.endsWith(".txt")))&& ((list[i].size <= 8*1024)&& (list[i].size >= 20))) new Associator(list[i].name, dir, crc, list[i].size); } } return true; } /** * класс скачивает переданный ему файл и ассоциирует его содержимое с * директорией, в которой этот файл находится */ protected class Associator extends Thread { String name, dir; long crc; long size; byte buf[]; /** * скачивает файл и запускает новый вычислительный поток, * в котором уже и ассоциирует */ public Associator(String name, String dir, long crc, long size) { this.name = name; this.dir = dir; this.crc = crc; this.size = size; try { logOut("|> get: "+dir+name); // вначале пробуем скачать в пассивном режиме Socket socket = getPasv(); if(socket != null) { String reply = write("RETR "+dir+name+"\r\n"); buf = read(socket.getInputStream(), (int) size); socket.close(); if(reply.indexOf("\n1") < 0) if(reply.indexOf("\n2") < 0) if(reply.indexOf("\n3") < 0) if(reply.indexOf("\n4") < 0) read(socket1.getInputStream()); } else // если не получилось в пассивном - попытка скачать в активном { ServerSocket ss = new ServerSocket(0, 1, InetAddress.getLocalHost()); String reply = write("PORT "+ ss.getInetAddress().getHostAddress().replace('.',',')+ ","+((ss.getLocalPort()&0xff00)>>8)+ ","+(ss.getLocalPort()&0xff)+"\r\n"); if(reply.indexOf("4") == 0) return; if(reply.indexOf("5") == 0) return; reply = write("RETR "+dir+name+"\r\n"); Socket socket2 = ss.accept(); buf = read(socket.getInputStream(), (int) size); socket2.close(); ss.close(); if(reply.indexOf("\n1") < 0) if(reply.indexOf("\n2") < 0) if(reply.indexOf("\n3") < 0) if(reply.indexOf("\n4") < 0) read(socket1.getInputStream()); } // в случае успешной закачки - запуск вычислительного потока для обработки setDaemon(false); start(); } catch(Exception e){} } /** * обраюатывает файл и ассоциирует его с директорией */ public void run() { try { int i; String nm = name.toLowerCase(); // DOS -> WIN if(nm.endsWith(".nfo") || nm.endsWith(".diz")) { int win[] = { 0xC0,0xC1,0xC2,0xC3,0xC4,0xC5,0xA8,0xC6,0xC7, 0xC8,0xC9,0xCA,0xCB,0xCC,0xCD,0xCE,0xCF,0xD0, 0xD1,0xD2,0xD3,0xD4,0xD5,0xD6,0xD7,0xD8,0xD9, 0xDA,0xDB,0xDC,0xDD,0xDE,0xDF,0xE0,0xE1,0xE2, 0xE3,0xE4,0xE5,0xB8,0xE6,0xE7,0xE8,0xE9,0xEA, 0xEB,0xEC,0xED,0xEE,0xEF,0xF0,0xF1,0xF2,0xF3, 0xF4,0xF5,0xF6,0xF7,0xF8,0xF9,0xFA,0xFB,0xFC, 0xFD,0xFE,0xFF}; int dos[] = { 0x80,0x81,0x82,0x83,0x84,0x85,0xF0,0x86,0x87, 0x88,0x89,0x8A,0x8B,0x8C,0x8D,0x8E,0x8F,0x90, 0x91,0x92,0x93,0x94,0x95,0x96,0x97,0x98,0x99, 0x9A,0x9B,0x9C,0x9D,0x9E,0x9F,0xA0,0xA1,0xA2, 0xA3,0xA4,0xA5,0xF1,0xA6,0xA7,0xA8,0xA9,0xAA, 0xAB,0xAC,0xAD,0xAE,0xAF,0xE0,0xE1,0xE2,0xE3, 0xE4,0xE5,0xE6,0xE7,0xE8,0xE9,0xEA,0xEB,0xEC, 0xED,0xEE,0xEF}; for(i = 0; i < buf.length; i++) for(int j = 0; j < win.length; j++) if(buf[i] == (byte)dos[j]) buf[i]= (byte)win[j]; } String text = (new String(buf, 0, buf.length)).trim()+"\n "; // кладу файл в копилку Statement stmt; try { stmt = c.createStatement(); stmt.executeQuery("INSERT INTO small_texts_temp_"+serverID+" VALUES(NULL, '"+crc+"', '"+addSlashes(name)+"', '"+addSlashes(text)+"')"); stmt.close(); } catch(Exception e){logOut("error1: "+serverID+" - "+name);} text = ((new String(buf, 0, buf.length)).trim()+"\n ").toLowerCase(); // заменяем ненужные символы на пробелы char un[] = { 0x09,0x0A,0x0B,0x0C,0x0D,0x1C,0x1D,0x1E,0x1F,'~','`', '!','@','#','$','%','^','&','*','(',')','-','_','=', '+','|','\\','/','?','<','>',',','.','{','}','[',']', '\'','"',':',';','0','1','2','3','4','5','6','7','8', '9'}; for(i = 0; i < un.length; i++) text = text.replace(un[i], ' '); text = ' '+text+' '; // удаляем "длинные" пробелы и короткие (меньше 4х символов) слова StringBuffer sb = new StringBuffer(text.length()*2); sb.append(' '); int word; i = 0; while(i < text.length()-1) { word = 0; while((i+word < text.length()-1)&&(text.charAt(i+word) != ' ')) word++; if(word > 3) { sb.append(text.substring(i, i+word)); sb.append(' '); } i += word; while((i < text.length()-1)&&(text.charAt(i) == ' ')) i++; } text = sb.toString(); // разбиваем директорию на части по символу / String dn[] = dir.split("/"); String da[] = {""+UIN, ""+UIN, ""+UIN}; int j = 0; for(i = dn.length-1;(i > dn.length-3-1)&&(i > 0); i--) { da[j] = addSlashes(dn[i]); j++; } // вносим ассоциацию stmt = c.createStatement(); stmt.executeQuery("INSERT INTO associations VALUES(NULL, '"+crc+"', "+UIN+", '"+addSlashes(text)+"', '"+da[0]+"', '"+da[1]+"', '"+da[2]+"')"); stmt.close(); // удаляем устаревшие ассоциации stmt = c.createStatement(); stmt.executeQuery("DELETE FROM associations WHERE crc='"+crc+"' AND AgentID != "+UIN); stmt.close(); // обновляем ассоциацию с директорией stmt = c.createStatement(); stmt.executeQuery("UPDATE dir_temp_index_"+serverID+" SET dirText = CONCAT(dirText, ' "+text+"') WHERE crc='"+crc+"'"); stmt.close(); } catch(Exception e){logOut("error2: "+serverID+" - "+name);} } } } }