import java.io.*;
import java.net.*;
import java.sql.*;
import java.util.*;
import java.util.zip.*;
/**
* Класс Agent в автоматическом режиме должен проверять зарегистрированные в таблице 'dir_index'
* пути на работоспособность и отыскивать новые, ранее не индексированные директории.<p>
* Дополнительной задачей является сбор из файлов, содержащих текст
* (*.txt *.html *.htm *.nfo [*.mp3 *.zip *.gz]) информацию, выделение из неё смысловой части
* и внесение этой части в таблицу ассоциаций 'аssociations' в соответствием с названиями
* директорий из пути этого файла.<p>
* В работе используется ДВ MySQL, формат указанных таблиц:
<pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;">
CREATE TABLE servers (
UID int(11) NOT NULL auto_increment,
host varchar(255),
port int(11) NOT NULL default '21',
user varchar(255) NOT NULL default 'anonymous',
pass varchar(255) NOT NULL default 'ymi@FTPSearch',
about varchar(255),
indexStart int(11),
indexEnd int(11),
lastAttempt int(11),
online int(1),
PRIMARY KEY (UID)
) TYPE=MyISAM;
// UID - уникальный индетификатор
// host, port - хост и порт сервера
// user, pass - имя и пароль к аккаунту
// about - дополнительная информация о сервере и его содержимом
// indexStart, indexEnd - время последнего начала индекса и его конца
// lastAttempt - последняя попытка доступа к серверу
// online - состояние доступности сервера в сети на данный момент
</pre>
<p>
<pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;">
CREATE TABLE dir_index_[serverID] (
UID int(11) NOT NULL auto_increment,
serverID int(11),
serverText text,
path text,
crc varchar(32),
name varchar(255),
level int(11),
time int(11),
filenameList text,
filesizeList text,
dirSize varchar(255),
dirText text,
PRIMARY KEY (UID)
) TYPE=MyISAM;
// serverID - ID сервера, таблица создаётся автоматически
// UID - уникальный индетификатор директории
// serverText - строка доступа к серверу следующего типа: ftp://name@host:port
// path - путь до директории от корня сервера, вместе с serverText создаёт абсолютный URL на эту директорию
// name - название директории
// level - уровень вложености; 0 - для корневой директории, далее - по нарастающей
// time - время данного сканирования
// filenameList - список названий файлов, содержащихся в данной директории; названия идут через символ '\n'
// filesizeList - список размеров файлов, дополняет список filenameList
// dirSize - сумма размеров всех файлов, содержащихся в директории
// dirText - то же самое, что заносится в ассоциации
</pre>
<p>
<pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;">
CREATE TABLE small_texts_[serverID] (
UID int(11) NOT NULL auto_increment,
crc VARCHAR(32),
name varchar(255),
content text,
PRIMARY KEY (UID)
) TYPE=MyISAM;
// UID - уникальный индетификатор
// crc - CRC32 для path данного текстового файла
// name - имя файла
// content - содержимое файла
</pre>
<p>
<pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;">
CREATE TABLE associations (
UID int(11) NOT NULL auto_increment,
crc VARCHAR(32),
AgentID int(11),
content text,
range1 varchar(255),
range2 varchar(255),
range3 varchar(255),
PRIMARY KEY (UID)
) TYPE=MyISAM;
// UID - уникальный индетификатор
// crc - CRC32 код от пути директории, где был текст
// AgentID - UIN агента, установившего ассоциацию
// content - текстовое содержимое ассоциации
// range1, range2, range3 - куски текста (слова), с которыми ассоциирован данный текст
</pre>
<p>
<pre style="background-color: #f0f0f0;font-family: Courier New, Times;font-size: 12px;padding: 3px;">
CREATE TABLE user_associations (
UID int(11) NOT NULL auto_increment,
time int(11),
request varchar(255),
content varchar(255),
crc varchar(32),
PRIMARY KEY (UID),
UNIQUE KEY crc (crc)
) TYPE=MyISAM;
// UID - уникальный индетификатор
// time - время создания ассоциации
// request - регулярное выражение по мотивам имени директории
// content - текстовое содержимое ассоциации
// crc - CRC32 код от request и content соединённых через символ "/" для предотвращения повторов
</pre>
*/
class Agent extends Thread
{
Spy spyPool[];
Connection c;
/**
* Запуск из коммандной строки.
*
* <p>Синтаксис запуска:
* <br><b>Agent [max Spyes <i>(5)</i>] [DBHost <i>(localhost)</i>] [DB <i>(FTPSearch)</i>] [user <i>(root)</i>] [pass]</b><br><br>
* <br> <b>max Spyes</b> - задаёт максимальное количество одновременно работающих "шпионов"
* <br> <b>DBHost</b> - хост сервера MySQL
* <br> <b>user</b> - имя вашего пользователя в базе
* <br> <b>pass</b> - пароль вашего пользователя в базе
* <br> <b>DB</b> - имя рабочей базы данных
*/
public static void main(String args[])
{
if(args.length < 4)
{
System.out.println("Wrong number of params. Use:\nAgent [max_Spyes (5)] [DBHost (localhost)] [DB (FTPSearch)] [user (root)] [pass]");
return;
}
String pass = "";
if(args.length > 4) pass = args[4];
try
{
int maxSpyes = Integer.parseInt(args[0]);
new Agent(maxSpyes, args[1], args[3], pass, args[2]);
}
catch(Exception e){System.out.println("Wrong params: "+e);}
}
/**
* Запуск агента индексирования.
*
* @param max_Spyes задаёт максимальное количество одновременно работающих "шпионов"<br>
* @param DBHost хост сервера MySQL<br>
* @param user имя вашего пользователя в базе<br>
* @param pass пароль вашего пользователя в базе<br>
* @param DB имя рабочей базы данных<br>
* @exception Exception если проблемы применения параметров
* @see Agent#run()
*/
public Agent(int maxSpyes, String DBHost, String user, String pass, String DB) throws Exception
{
Class.forName("org.gjt.mm.mysql.Driver").newInstance();
c = DriverManager.getConnection("jdbc:mysql://"+DBHost+"/"+DB+"?user="+user+"&password="+pass);
spyPool = new Spy[maxSpyes];
for(int i = 0; i < maxSpyes; i++) spyPool[i] = null;
setPriority(NORM_PRIORITY);
start();
}
/**
* проверка статуса серверов online/offline
* @see Spy
*/
protected void startScanners()
{
try
{
Statement stmt = c.createStatement();
ResultSet rs = stmt.executeQuery("SELECT host, port FROM servers WHERE UID > 0");
while(rs.next())
new Scaner(rs.getString(1),rs.getInt(2));
rs.close();
stmt.close();
}
catch(Exception e){}
}
/**
* Тело агента индексирования.
* Эта функция запускает "шпионов" и следит за тем,
* чтобы индексирующие шпионы не "подвисали".
*/
public void run()
{
int i;
boolean spyStarted;
while(true)
{
// каждый раз вначале запускаем сканирование директорий
startScanners();
// потом обходим весь пул "шпионов"
spyStarted = false;
for(i = 0; i < spyPool.length; i++)
if((spyPool[i] == null) && !spyStarted) // и запускаем одного нового шпиона, если в пуле есть свободное место
{
spyPool[i] = new Spy(i);
spyStarted = true;
}
else if(spyPool[i] != null) // если же шпион уже вот как 5 минут бездействует, то производим его рестарт
if(spyPool[i].lastEvent < (new java.util.Date()).getTime() - 300000)
spyPool[i].restart();
System.gc();
try{sleep(300000);}
catch(Exception e){}
}
}
/** Сканер активности сервера с определённым IP на определённом порту. */
class Scaner extends Thread
{
String IP;
int port;
public Scaner(String IP, int port)
{
this.IP = IP;
this.port = port;
start();
}
public void run()
{
String query = "";
try
{
Socket s = new Socket(IP, port);
s.close();
query = "UPDATE servers SET online=1 WHERE host='"+IP+"' AND port="+port;
}
catch(Exception e)
{
query = "UPDATE servers SET online=0 WHERE host='"+IP+"' AND port="+port;
}
try
{
Statement stmt = c.createStatement();
stmt.executeQuery(query);
stmt.close();
}
catch(Exception e){}
}
}
/**
* Индексирующий "шпион": рекурсивно обходит всю файловую систему сервера
* и заносит данные в БД.
*/
protected class Spy extends Thread
{
/** время последнего события индексатора (миллисекунды) */
public long lastEvent;
protected Socket socket1, socket2;
protected int serverID, slotNum, UIN, port, reconnections;
protected String host, user, pass, serverText, lastDir;
protected FileOutputStream log;
protected boolean notFromHome;
/**
* в параметрах инициализайии передаётся номер слота
* в пуле "шпионов" класса Agent
*/
public Spy(int slotNum)
{
this.slotNum = slotNum;
eventTime();
UIN = (int)(lastEvent/1000);
socket1 = null;
socket2 = null;
serverID = 0;
eventTime();
setDaemon(false);
setPriority(NORM_PRIORITY);
start();
}
/** пишет лог событий */
synchronized protected void logOut(String str)
{
if(!str.endsWith("\n")) str = str+"\r\n";
//System.out.print(str);
try{log.write(str.getBytes());}
catch(Exception e){}
}
/** обновляет время последнего соединения с интиресующим сервером */
protected void eventTime()
{lastEvent = (new java.util.Date()).getTime();}
/**
* простенькая функция рестарта,
* принцип работы: закрываем активные сокеты и переоткрываем их
*/
public void restart()
{
eventTime();
try{socket1.close();}
catch(Exception e){}
try{socket2.close();}
catch(Exception e){}
reconnect();
}
/**
* Простой обработчик InputStream. Предпринимает попытку прочесть 64кб из потока,
* после чего из прочитанных данных создаёт строку.
*
* @return строка, прочитанная из InputStream
* @exception Exception на случай ошибки потока или прочтения нуля байт и попытки из этого 0 создать новую строку
*/
protected String read(InputStream is) throws Exception
{
byte buf[] = new byte[64*1024];
int r = is.read(buf);
//logOut("< "+(new String(buf, 0, r)));
return new String(buf, 0, r);
}
/**
* Простой обработчик InputStream. Читает ровно lng байт из потока и возвращает их.
*
* @return lng байт из потока
* @exception Exception на случай ошибки потока или задания неверного значения lng
*/
protected byte[] read(InputStream is, int lng)
{
byte buf[] = new byte[lng];
int off = 0, r = 1;
try
{
while((r > 0)&&(off < lng))
{
r = is.read(buf, off, buf.length-off);
if(r > 0)
{
off += r;
if(buf[off-1] == 0) r = 0;
}
}
}
catch(Exception e){}
if(off > 0)
{
byte b[] = new byte[off];
for(r = 0; r < off; r++) b[r] = buf[r];
buf = b;
System.gc();
}
return buf;
}
/** пишет запрос и тут же получает ответ */
protected void write(OutputStream os, String str) throws Exception
{
eventTime();
os.write(str.getBytes());
//logOut("> "+str);
}
/**
* подготовка к запуску рекурсивного обхода:
* Удаление старых записей, выбор сервера и запуск обработчика.
*/
public void run()
{
try
{
// удаляем все серверы, к которым небыло доступа в течении 15 суток
Statement stmt = c.createStatement();
ResultSet rs = stmt.executeQuery("SELECT UID FROM servers WHERE indexStart < UNIX_TIMESTAMP()-1296000");
while(rs.next())
{
try
{
Statement stmt2 = c.createStatement();
stmt2.executeQuery("DROP TABLE `dir_index_"+rs.getInt(1)+"`");
stmt2.close();
}
catch(Exception e){}
try
{
Statement stmt2 = c.createStatement();
stmt2.executeQuery("DROP TABLE `dir_temp_index_"+rs.getInt(1)+"`");
stmt2.close();
}
catch(Exception e){}
try
{
Statement stmt2 = c.createStatement();
stmt2.executeQuery("DROP TABLE `small_texts_"+serverID+"`");
stmt2.close();
}
catch(Exception e){}
try
{
Statement stmt2 = c.createStatement();
stmt2.executeQuery("DROP TABLE `small_texts_temp_"+serverID+"`");
stmt2.close();
}
catch(Exception e){}
}
stmt.close();
// создаём список серверов, что уже обрабатываются
String servers = "0";
for(int i = 0; i < spyPool.length; i++)
if(spyPool[i] != null)
{
try{servers = servers + "," + spyPool[i].serverID;}
catch(Exception e){}
}
// берём сервер, который не проверяли уже в течении 8 часов и который по результатам работы сканера онлайн, так же сервер не должен уже находиться в обработке
stmt = c.createStatement();
rs = stmt.executeQuery("SELECT UID, host, port, user, pass, indexStart, indexEnd FROM servers WHERE ((((lastAttempt < UNIX_TIMESTAMP()-600)AND(indexStart < UNIX_TIMESTAMP()-600))AND(indexEnd < UNIX_TIMESTAMP()-57600))AND(UID NOT IN("+servers+")))AND(online = 1) ORDER BY lastAttempt");
if(!rs.next()) // если такого сервера нет - то чистка и выход
{
rs.close();
stmt.close();
eventTime();
spyPool[slotNum] = null;
return;
}
// заполняем данные о сервере
serverID = rs.getInt(1);
host = rs.getString(2);
port = rs.getInt(3);
user = rs.getString(4);
pass = rs.getString(5);
notFromHome = rs.getInt(6) < rs.getInt(7);
rs.close();
stmt.close();
// обновляем время последней попытки доступа
stmt = c.createStatement();
stmt.executeQuery("UPDATE servers SET lastAttempt = UNIX_TIMESTAMP() WHERE UID="+serverID);
stmt.close();
// если в прошлый раз индекс был не доведён до конца,
// то пытаемся найти на чём остановились и продолжить
if(notFromHome)
{
try
{
stmt = c.createStatement();
rs = stmt.executeQuery("SELECT path FROM dir_temp_index_"+serverID+" WHERE 1 ORDER BY -time LIMIT 0,1");
if(rs.next()) lastDir = rs.getString(1);
else notFromHome = false;
rs.close();
stmt.close();
}
catch(Exception e){notFromHome = false;}
}
if(!notFromHome)
{
// если есть прошлые темповые таблицы - удаляем их
try
{
stmt = c.createStatement();
stmt.executeQuery("DROP TABLE `dir_temp_index_"+serverID+"`");
stmt.close();
}
catch(Exception e){}
try
{
stmt = c.createStatement();
stmt.executeQuery("DROP TABLE `small_texts_temp_"+serverID+"`");
stmt.close();
}
catch(Exception e){}
// и создаём новые темповые таблицы
try
{
stmt = c.createStatement();
stmt.executeQuery("CREATE TABLE dir_temp_index_"+serverID+"(UID int(11) NOT NULL auto_increment,serverID int(11),serverText text,path text,crc varchar(32),name varchar(255),level int(11),time int(11),filenameList text,filesizeList text,dirSize varchar(255),dirText text,PRIMARY KEY (UID))TYPE=MyISAM;");
stmt.close();
}
catch(Exception e){}
try
{
stmt = c.createStatement();
stmt.executeQuery("CREATE TABLE small_texts_temp_"+serverID+" (UID int(11) NOT NULL auto_increment, crc VARCHAR(32), name varchar(255), content text, PRIMARY KEY (UID)) TYPE=MyISAM;");
stmt.close();
}
catch(Exception e){}
}
// создаём serverText
serverText = "ftp://" + host;
if(port != 21)
serverText = serverText + ":" + port;
// открываем лог
log = new FileOutputStream("logs/log."+serverID+".txt", false);
logOut(""+UIN+"---------------------\r\n"+serverText);
reconnections = 0;
// и запуск рекурсивного обработчика директорий
try
{
// попытка соединения с сервером, если неудачная - то чистим и уходим
if(!reconnect())
{
logOut("X connection to "+user+"@"+host+":"+port+" closed");
log.close();
eventTime();
spyPool[slotNum] = null;
return;
}
reconnections = 0;
// обновляем время старта индекса
stmt = c.createStatement();
stmt.executeQuery("UPDATE servers SET indexStart = UNIX_TIMESTAMP() WHERE UID="+serverID);
stmt.close();
// и пытаемся создать индекс
if(!spider(0, getDir(), "/", new long[255]))
{
// если не получилось - чистим и уходим
logOut("X connection to "+user+"@"+host+":"+port+" closed");
log.close();
eventTime();
spyPool[slotNum] = null;
return;
}
// если был установлен флаг продолжения индекса,
// но директория, с которой продолжать так и не была найдена, то
// удаляем темповые таблицы, чистим и уходим
if(notFromHome)
{
try
{
stmt = c.createStatement();
stmt.executeQuery("DROP TABLE `dir_temp_index_"+serverID+"`");
stmt.close();
}
catch(Exception e){}
try
{
stmt = c.createStatement();
stmt.executeQuery("DROP TABLE `small_texts_temp_"+serverID+"`");
stmt.close();
}
catch(Exception e){}
log.close();
eventTime();
spyPool[slotNum] = null;
return;
}
// если здесь, то всё прошло успешно, о чём и сообщаем
logOut("*> task "+user+"@"+host+":"+port+" complete");
// удаляем главную таблицу
try
{
stmt = c.createStatement();
stmt.executeQuery("DROP TABLE `dir_index_"+serverID+"`");
stmt.close();
}
catch(Exception e){}
try
{
stmt = c.createStatement();
stmt.executeQuery("DROP TABLE `small_texts_"+serverID+"`");
stmt.close();
}
catch(Exception e){}
// и переименовываем темповые таблицы в главные
stmt = c.createStatement();
stmt.executeQuery("ALTER TABLE `dir_temp_index_"+serverID+"` RENAME `dir_index_"+serverID+"`");
stmt.close();
stmt = c.createStatement();
stmt.executeQuery("ALTER TABLE `small_texts_temp_"+serverID+"` RENAME `small_texts_"+serverID+"`");
stmt.close();
// указываем, что индекс завершён
stmt = c.createStatement();
stmt.executeQuery("UPDATE servers SET indexEnd = UNIX_TIMESTAMP() WHERE UID="+serverID);
stmt.close();
// на всякий случай оптимизируем таблицы:
stmt = c.createStatement();
stmt.executeQuery("OPTIMIZE TABLE `user_associations`, `associations`, `servers`, `dir_index_"+serverID+"`, `small_texts_"+serverID+"`");
stmt.close();
}
catch(Exception e)
{logOut("X connection to "+user+"@"+host+":"+port+" closed\r\n"+e);}
log.close();
}
catch(Exception e){}
eventTime();
spyPool[slotNum] = null;
}
/**
* Функция для записи в OutputStream комманды и получения ответа из InputStream.
*
* @param str строка комманды, которую нужно записать в OutputStream
* @exception Exception на случай ошибок потоков или пустой строки.
*/
protected String write(String str) throws Exception
{
write(socket1.getOutputStream(), str);
return read(socket1.getInputStream());
}
/** 4 попытки каждые 10 секунд + 1 через 10 минут переконнектится к серверу */
protected boolean reconnect()
{
reconnections++;
if(reconnections == 2) return false;
eventTime();
for(int i = 0; i < 4; i++)
{
if(login()) return true;
logOut("[] sleep 10 sec.");
try{sleep(10000);}
catch(Exception e){};
}
logOut("[] sleep 10 min.");
try{sleep(600000);}
catch(Exception e){};
return login();
}
/**
* Вход на сервер.
* Неудачей соединения с сервером считается обрыв связи и непринятые user/pass.
* При удачном соединении производится попытка перейти в корень сервера (CD /).
*/
protected boolean login()
{
String dt = "";
int h = Calendar.getInstance().get(Calendar.HOUR_OF_DAY);
int m = Calendar.getInstance().get(Calendar.MINUTE);
if(h < 10) dt = "0"+h;
else dt = ""+h;
dt = dt+":";
if(m < 10) dt = dt+"0"+m;
else dt = dt+m;
logOut("** ["+dt+"]"+" connect to "+user+"@"+host+":"+port);
try
{
String reply;
socket1 = new Socket();
socket1.connect(new InetSocketAddress(host, port), 30000);
InputStream is = socket1.getInputStream();
OutputStream os = socket1.getOutputStream();
// read server "hello"
reply = write("NOOP\r\n");
if(reply.indexOf("220") == 0)
if(reply.indexOf("\n200") < 0) read(is);
// send user
reply = write("USER "+user+"\r\n");
if((reply.indexOf("4") == 0)||(reply.indexOf("5") == 0))
{
logOut("X fail to connect to "+user+"@"+host+":"+port);
return false;
}
// send pass
reply = write("PASS "+pass+"\r\n");
if((reply.indexOf("4") == 0)||(reply.indexOf("5") == 0))
{
logOut("X fail to connect to "+user+"@"+host+":"+port);
return false;
}
// set REPRESENTATION TYPE as ASCII
reply = write("TYPE A\r\n");
return true;
}
catch(Exception e){}
logOut("X fail to connect to "+user+"@"+host+":"+port);
return false;
}
/** Получение коммандой PWD текущей директории сервера. */
protected String getDir() throws Exception
{
String dir = write("PWD\r\n");
dir = dir.substring(dir.indexOf('"')+1);
return dir.substring(0, dir.indexOf('"'));
}
/**
* Получение коммандой PASV сокета пассивного соединения или null,
* если не поддерживается.
*/
protected Socket getPasv()
{
try
{
String reply = write("PASV\r\n");
while(reply.indexOf("2") != 0)
{
sleep(1000);
reply = write("PASV\r\n");
}
int s = reply.indexOf("(")+1, e = reply.indexOf(")", s);
reply = reply.substring(s, e);
s = reply.indexOf(",");
String host = reply.substring(0, s);
e = reply.indexOf(",", s+1);
host = host+"."+reply.substring(s+1, e);
s = reply.indexOf(",", e+1);
host = host+"."+reply.substring(e+1, s);
e = reply.indexOf(",", s+1);
host = host+"."+reply.substring(s+1, e);
s = reply.indexOf(",", e+1);
String port1 = reply.substring(e+1, s);
e = reply.indexOf(",", s+1);
String port2 = reply.substring(s+1);
int port = (Integer.parseInt(port1) << 8) | Integer.parseInt(port2);
Socket socket = new Socket();
socket.connect(new InetSocketAddress(host, port), 30000);
return socket;
}
catch(Exception e){}
return null;
}
/**
* получение листинга директории в активном режиме
* (это запасной вариант, на случай, если нельзя получить в пассивном)
*/
protected String getListActiveMode()
{
try
{
ServerSocket ss = new ServerSocket(0, 1, InetAddress.getLocalHost());
String reply = write("PORT "+
ss.getInetAddress().getHostAddress().replace('.',',')+
","+((ss.getLocalPort()&0xff00)>>8)+
","+(ss.getLocalPort()&0xff)+"\r\n");
if(reply.indexOf("4") == 0) return null;
if(reply.indexOf("5") == 0) return null;
reply = write("LIST\r\n");
socket2 = ss.accept();
byte buf[] = read(socket2.getInputStream(), 100*1024);
int i = buf.length-1;
while((i > 1)&&(buf[i] == 0)) i--;
String answer = (new String(buf, 0, i)).trim()+"\n ";
socket2.close();
ss.close();
if(reply.indexOf("\n1") < 0)
if(reply.indexOf("\n2") < 0)
if(reply.indexOf("\n3") < 0)
if(reply.indexOf("\n4") < 0)
read(socket1.getInputStream());
return answer;
}
catch(Exception ex){}
return "";
}
/**
* Класс логически представляет файл на сервере, описание которого было полученно
* коммандой LIST.
*
* <p> Класс создаётся в функции Agent.getList().
* @see Agent#getList()
*/
protected class FTPFile
{
/** названние файла */
public String name;
/** размер файла */
public long size;
/** является ли файл директорией */
public boolean isDir;
/** Парсит строку, содержащую информацию о файле и заполняет поля класса. */
public FTPFile(String file)
{
if(file.trim().length() < 1)
{
isDir = true;
name = ".";
return;
}
file = file.trim();
isDir = file.charAt(0) == 'd';
if(!isDir) isDir = file.charAt(0) == 'D';
if(!isDir)
{
size = 0;
int i = file.indexOf(" ", 31);
if(i < 0) i = 43;
try{size = Long.parseLong(file.substring(30, i).trim());}
catch(Exception e1)
{
try{size = Long.parseLong(file.substring(30, 42).trim());}
catch(Exception e2)
{
try{size = Long.parseLong(file.substring(30, 41).trim());}
catch(Exception e3){}
}
}
}
else size = 0;
name = file.substring(54).trim();
}
}
/**
* Читает список текущей директории с помощью комманды LIST и возвращает список файлов.
* @return возвращает список файлов в виде массива класса FTPFile
* @see Agent.FTPFile
*/
synchronized protected FTPFile[] getList() throws Exception
{
String list, reply;
socket2 = getPasv();
if(socket2 != null)
{
reply = write("LIST\r\n");
byte buf[] = read(socket2.getInputStream(), 100*1024);
socket2.close();
int i = buf.length-1;
while((i > 1)&&(buf[i] == 0)) i--;
list = (new String(buf, 0, i)).trim()+"\n ";
if(reply.indexOf("\n1") < 0)
if(reply.indexOf("\n2") < 0)
if(reply.indexOf("\n3") < 0)
if(reply.indexOf("\n4") < 0)
read(socket1.getInputStream());
}
else
list = getListActiveMode();
int i;
int last = list.indexOf("\n"), c = 0;
while(last >= 0)
{
c++;
last = list.indexOf("\n", last+1);
}
if(c == 0) return null;
FTPFile files[] = new FTPFile[c];
last = list.indexOf("\n");
c = 0;
i = 0;
while(last >= 0)
{
try{files[i] = new FTPFile(list.substring(c, last));}
catch(Exception e){files[i] = new FTPFile("");}
c = last;
last = list.indexOf("\n", last+1);
i++;
}
return files;
}
/**
* простенькая функция, добавляющая c-like слэши для символов \,',",{,}
* и так далее, используется для добавления строк в SQL
*/
protected String addSlashes(String text)
{
StringBuffer sb = new StringBuffer(text.length()*2);
for(int i = 0; i < text.length(); i++)
{
if(text.charAt(i) == '"') sb.append("\\\"");
else if(text.charAt(i) == '\'') sb.append("\\\'");
else if(text.charAt(i) == '\\') sb.append("\\\\");
else if(text.charAt(i) == '\n') sb.append("\\n");
else if(text.charAt(i) == '{') sb.append("-");
else if(text.charAt(i) == '}') sb.append("-");
else if(text.charAt(i) == '\r');
else sb.append(text.charAt(i));
}
return sb.toString();
}
/**
* считает CRC32 код строки
* (в этой версии программы используется уже не CRC32 алгоритм,
* а Adler32 - как более быстрый, но название осталось старое)
*/
protected long getCRC32(String str)
{
Adler32 crc = new Adler32();
crc.update(str.getBytes());
return crc.getValue();
}
/**
* Функция рекурсивно перебирает директории и вносит их содержимое в БД.
*
* <p> Максмальная глубина просмотра директории - 255.
* При завершении обработки директории функция пытается выполнить комманду CDUP.
* При обрыве связи предпринимаются 4 попытки реконнекта, после чего - завершение функции.
*
* <p> При обработке файлов, выделяются файлы с расширениями .txt .diz .nfo
* и если они менее 20 кб, то вызывается класс Associator, который их обрабатывает отдельно.
*
* @param level глубина просмотра от корневой директории
* @param dir директория (полный путь, path), которую предлагается обработать серверу.
* @param dirname имя директории, которую предлагается обработать серверу.
* @param crc_list список CRC содержимого всех родительских директорий, используется, чтобы избежать зацикливания
* @see Agent.Associator
* @see Agent.Spy
*/
protected boolean spider(int level, String dir, String dirname, long[] crc_list) throws Exception
{
// максимальная глубина сканирования - 255 (это в основном связано с
// реализайцией рекурсиии в Java - если поставить больше, то некоторые
// JVM выдадут ошибку)
if(level >= 255) return true;
// попытка выбрать нужную директорию, если не получается (например нет прав доступа)
// то возвращает ошибку
String reply = write("CWD "+dir+"\r\n");
if(reply.indexOf("3") == 0) return false;
if(reply.indexOf("4") == 0) return false;
if(reply.indexOf("5") == 0) return false;
dir = getDir();
if(dir.lastIndexOf("/") != dir.length()-1) dir = dir + "/";
// создание листа
FTPFile list[] = getList();
if(list == null)
if(reconnect())
list = getList();
if(list == null) return true;
String fileList = "", fileSize = "", dirList = "";
long dirSize = 0;
int textFiles = 0;
for(int i = 0; i < list.length; i++)
{
if(!list[i].isDir)
{
dirList = dirList+list[i].name+"\n"+list[i].size+"\n";
fileList = fileList+list[i].name+"\n";
fileSize = fileSize+list[i].size+"\n";
dirSize += list[i].size;
if(textFiles < 5)
{
String nm = list[i].name.toLowerCase();
if( nm.endsWith(".txt")&&
((list[i].size <= 8*1024)&&
(list[i].size >= 20)))
textFiles++;
}
}
else
dirList = dirList+list[i].name+"\nD\n";
}
// проверка содержимого на повторы по CRC32 от содержимого:
long crc = getCRC32(dirList);
for(int i = 0; i < level; i++)
if(crc_list[i] == crc) return true;
// начало обрабработки директории
crc_list[level] = crc;
crc = getCRC32(dir);
logOut("|> "+dir);
// режим "не с начала" - доиндекс сервера
if(notFromHome)
{
// проверяем, совпадает ли имя текущей директории с именем директории,
// откуда нужно производить доиндекс
notFromHome = !lastDir.equals(dir);
// если не совпадает, то проверяем, есть ли в пути директории, откуда нужно
// производить доиндекс пути до текущей директории, и если нету - выходим
// (нужно чтобы не сканировать рекурсивно лишние директории)
if(notFromHome)
{
String dr = "/";
String dn[] = lastDir.split("/");
if(dn.length < level) return true;
for(int i = 0; i < level; i++)
dr = dr + dn[i] + "/";
if(!dr.equals(dir)) return true;
}
else
{
// если начиная с этой директории нужно рпоизводить доиндекс,
// то вначале удаляем эту директорию из БД, чтоб избежать повторов
try
{
Statement stmt = c.createStatement();
stmt.executeQuery("DELETE FROM dir_temp_index_"+serverID+" WHERE path LIKE '"+addSlashes(dir)+"'");
stmt.close();
}
catch(Exception e){}
try
{
Statement stmt = c.createStatement();
stmt.executeQuery("DELETE FROM small_texts_temp_"+serverID+" WHERE crc="+crc+"'");
stmt.close();
}
catch(Exception e){}
}
}
// если уже режим индекса, а не поиска доиндекса, то добавляем директорию в лист
if(!notFromHome)
{
try
{
Statement stmt = c.createStatement();
stmt.executeQuery("INSERT INTO dir_temp_index_"+serverID+" VALUES(NULL, "+serverID+", '"+serverText+"', '"+addSlashes(dir)+"', '"+crc+"', '"+addSlashes(dirname)+"', "+level+", UNIX_TIMESTAMP(), '"+addSlashes(fileList)+"', '"+addSlashes(fileSize)+"', '"+dirSize+"', '')");
stmt.close();
}
catch(Exception e){return false;}
}
// обработка списка файлов и директорий
for(int i = 0; i < list.length; i++)
{
if(list[i].isDir) // если директория
{
if(!list[i].name.equals("."))
if(!list[i].name.equals(".."))
{
// и если это не ссылка на родительскую или текущую, то
// две попытки рекурсивного скана
try
{
if(spider(level+1, dir+list[i].name, list[i].name, crc_list))
reconnections = 0;
}
catch(Exception e)
{
try
{
if(!reconnect()) return false;
if(spider(level+1, dir+list[i].name, list[i].name, crc_list))
reconnections = 0;
else
return false;
}
catch(Exception ex){return false;}
}
}
}
else if(!notFromHome)
{
// если это файл и уже режим индекса и файл менее 8 кб и заканчивается на
// .nfo или .diz, или он заканчивается на .txt, меньше 8 кб и таких файлов
// в директории менее 5 штук, то запускаем ассоциатор
String nm = list[i].name.toLowerCase();
if( (nm.endsWith(".diz") ||
nm.endsWith(".nfo"))&&
((list[i].size <= 8*1024)&&
(list[i].size >= 20)))
new Associator(list[i].name, dir, crc, list[i].size);
else if(((textFiles < 5)&&
(nm.endsWith(".txt")))&&
((list[i].size <= 8*1024)&&
(list[i].size >= 20)))
new Associator(list[i].name, dir, crc, list[i].size);
}
}
return true;
}
/**
* класс скачивает переданный ему файл и ассоциирует его содержимое с
* директорией, в которой этот файл находится
*/
protected class Associator extends Thread
{
String name, dir;
long crc;
long size;
byte buf[];
/**
* скачивает файл и запускает новый вычислительный поток,
* в котором уже и ассоциирует */
public Associator(String name, String dir, long crc, long size)
{
this.name = name;
this.dir = dir;
this.crc = crc;
this.size = size;
try
{
logOut("|> get: "+dir+name);
// вначале пробуем скачать в пассивном режиме
Socket socket = getPasv();
if(socket != null)
{
String reply = write("RETR "+dir+name+"\r\n");
buf = read(socket.getInputStream(), (int) size);
socket.close();
if(reply.indexOf("\n1") < 0)
if(reply.indexOf("\n2") < 0)
if(reply.indexOf("\n3") < 0)
if(reply.indexOf("\n4") < 0)
read(socket1.getInputStream());
}
else // если не получилось в пассивном - попытка скачать в активном
{
ServerSocket ss = new ServerSocket(0, 1, InetAddress.getLocalHost());
String reply = write("PORT "+
ss.getInetAddress().getHostAddress().replace('.',',')+
","+((ss.getLocalPort()&0xff00)>>8)+
","+(ss.getLocalPort()&0xff)+"\r\n");
if(reply.indexOf("4") == 0) return;
if(reply.indexOf("5") == 0) return;
reply = write("RETR "+dir+name+"\r\n");
Socket socket2 = ss.accept();
buf = read(socket.getInputStream(), (int) size);
socket2.close();
ss.close();
if(reply.indexOf("\n1") < 0)
if(reply.indexOf("\n2") < 0)
if(reply.indexOf("\n3") < 0)
if(reply.indexOf("\n4") < 0)
read(socket1.getInputStream());
}
// в случае успешной закачки - запуск вычислительного потока для обработки
setDaemon(false);
start();
}
catch(Exception e){}
}
/**
* обраюатывает файл и ассоциирует его с директорией
*/
public void run()
{
try
{
int i;
String nm = name.toLowerCase();
// DOS -> WIN
if(nm.endsWith(".nfo") || nm.endsWith(".diz"))
{
int win[] = {
0xC0,0xC1,0xC2,0xC3,0xC4,0xC5,0xA8,0xC6,0xC7,
0xC8,0xC9,0xCA,0xCB,0xCC,0xCD,0xCE,0xCF,0xD0,
0xD1,0xD2,0xD3,0xD4,0xD5,0xD6,0xD7,0xD8,0xD9,
0xDA,0xDB,0xDC,0xDD,0xDE,0xDF,0xE0,0xE1,0xE2,
0xE3,0xE4,0xE5,0xB8,0xE6,0xE7,0xE8,0xE9,0xEA,
0xEB,0xEC,0xED,0xEE,0xEF,0xF0,0xF1,0xF2,0xF3,
0xF4,0xF5,0xF6,0xF7,0xF8,0xF9,0xFA,0xFB,0xFC,
0xFD,0xFE,0xFF};
int dos[] = {
0x80,0x81,0x82,0x83,0x84,0x85,0xF0,0x86,0x87,
0x88,0x89,0x8A,0x8B,0x8C,0x8D,0x8E,0x8F,0x90,
0x91,0x92,0x93,0x94,0x95,0x96,0x97,0x98,0x99,
0x9A,0x9B,0x9C,0x9D,0x9E,0x9F,0xA0,0xA1,0xA2,
0xA3,0xA4,0xA5,0xF1,0xA6,0xA7,0xA8,0xA9,0xAA,
0xAB,0xAC,0xAD,0xAE,0xAF,0xE0,0xE1,0xE2,0xE3,
0xE4,0xE5,0xE6,0xE7,0xE8,0xE9,0xEA,0xEB,0xEC,
0xED,0xEE,0xEF};
for(i = 0; i < buf.length; i++)
for(int j = 0; j < win.length; j++)
if(buf[i] == (byte)dos[j]) buf[i]= (byte)win[j];
}
String text = (new String(buf, 0, buf.length)).trim()+"\n ";
// кладу файл в копилку
Statement stmt;
try
{
stmt = c.createStatement();
stmt.executeQuery("INSERT INTO small_texts_temp_"+serverID+" VALUES(NULL, '"+crc+"', '"+addSlashes(name)+"', '"+addSlashes(text)+"')");
stmt.close();
}
catch(Exception e){logOut("error1: "+serverID+" - "+name);}
text = ((new String(buf, 0, buf.length)).trim()+"\n ").toLowerCase();
// заменяем ненужные символы на пробелы
char un[] = {
0x09,0x0A,0x0B,0x0C,0x0D,0x1C,0x1D,0x1E,0x1F,'~','`',
'!','@','#','$','%','^','&','*','(',')','-','_','=',
'+','|','\\','/','?','<','>',',','.','{','}','[',']',
'\'','"',':',';','0','1','2','3','4','5','6','7','8',
'9'};
for(i = 0; i < un.length; i++)
text = text.replace(un[i], ' ');
text = ' '+text+' ';
// удаляем "длинные" пробелы и короткие (меньше 4х символов) слова
StringBuffer sb = new StringBuffer(text.length()*2);
sb.append(' ');
int word;
i = 0;
while(i < text.length()-1)
{
word = 0;
while((i+word < text.length()-1)&&(text.charAt(i+word) != ' ')) word++;
if(word > 3)
{
sb.append(text.substring(i, i+word));
sb.append(' ');
}
i += word;
while((i < text.length()-1)&&(text.charAt(i) == ' ')) i++;
}
text = sb.toString();
// разбиваем директорию на части по символу /
String dn[] = dir.split("/");
String da[] = {""+UIN, ""+UIN, ""+UIN};
int j = 0;
for(i = dn.length-1;(i > dn.length-3-1)&&(i > 0); i--)
{
da[j] = addSlashes(dn[i]);
j++;
}
// вносим ассоциацию
stmt = c.createStatement();
stmt.executeQuery("INSERT INTO associations VALUES(NULL, '"+crc+"', "+UIN+", '"+addSlashes(text)+"', '"+da[0]+"', '"+da[1]+"', '"+da[2]+"')");
stmt.close();
// удаляем устаревшие ассоциации
stmt = c.createStatement();
stmt.executeQuery("DELETE FROM associations WHERE crc='"+crc+"' AND AgentID != "+UIN);
stmt.close();
// обновляем ассоциацию с директорией
stmt = c.createStatement();
stmt.executeQuery("UPDATE dir_temp_index_"+serverID+" SET dirText = CONCAT(dirText, ' "+text+"') WHERE crc='"+crc+"'");
stmt.close();
}
catch(Exception e){logOut("error2: "+serverID+" - "+name);}
}
}
}
}