代码"/>
文件监控(二) 代码
文件监控(二) 代码 ,目前监控只支持WINDOWS
//①开始监控目录
// 将监控到的文件放入list
void* ThreadWatcher(void* pParam)
{
dzlog_notice("[线程开启]开始监控目录 ThreadWatcher PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
Cwx_kk_upDlg * dlg = (Cwx_kk_upDlg*)pParam;
char notify[4096]={0};
DWORD cbBytes = 0;
TCHAR errorInfo[TF_MAX_PATH_LEN]={0};
FILE_NOTIFY_INFORMATION *pNotify=(FILE_NOTIFY_INFORMATION *)notify;
// **** 重要 ****
// *** 在CreateFile时指定FILE_FLAG_OVERLAPPED标志 \
ReadDirectoryChangesW时使用lpOverlapped参数
OVERLAPPED ov;//通知线程退出 ReadDirectoryChangesW
memset(&ov, 0, sizeof(ov));
ov.hEvent = CreateEvent(NULL, false, NULL, NULL);
// GetCurrentDirectory(MAX_PATH,path.GetBuffer(MAX_PATH+1));
CString str;
WCHAR wcFileName[TF_MAX_PATH_LEN]={0};
DWORD dwFileNameLength ;
bool bDeleteFileName = true;
char psTemp[TF_MAX_PATH_LEN]={0};
long lRet;
// SetEvent(eventStarted);//release signal
while (TRUE)
{
if(m_ExitCode==1)
goto end;
char *cFileName = new char[TF_MAX_PATH_LEN];
memset(cFileName, 0 , TF_MAX_PATH_LEN);
memset(wcFileName, 0 , TF_MAX_PATH_LEN);
bDeleteFileName = true;
//for (i=0;i<numDirs;i++)
lRet = ReadDirectoryChangesW( dlg->kkConfig.hDir, ¬ify, sizeof(notify),
true, FILE_NOTIFY_CHANGE_LAST_WRITE,
&cbBytes,0, NULL);
if(lRet)
{
memcpy( wcFileName, pNotify->FileName, pNotify->FileNameLength );
WideCharToMultiByte( CP_ACP, 0, wcFileName, -1, cFileName, TF_MAX_PATH_LEN, NULL, NULL );
sprintf( psTemp, "%s/%s", dlg->kkConfig.ftpPath, cFileName );
if(FileUtil::FindFirstFileExists( psTemp, FILE_ATTRIBUTE_DIRECTORY))
continue;
switch(pNotify->Action)
{
case FILE_ACTION_ADDED:
str.Format("Directory/File added - %s",cFileName);
break;
case FILE_ACTION_REMOVED:
str.Format("Directory/File removed - %s",cFileName);
break;
case FILE_ACTION_MODIFIED:
// WaitForSingleObject(hMutex,-1);//waiting
dlg->m_images.push( cFileName );
// ReleaseMutex(hMutex);//release
str.Format("Directory/File modified - %s",cFileName);
bDeleteFileName = false;
break;
case FILE_ACTION_RENAMED_OLD_NAME:
str.Format("Directory/File rename - %s",cFileName);
break;
case FILE_ACTION_RENAMED_NEW_NAME:
str.Format("Directory/File new name - %s",cFileName);
break;
default:
break;
}
}
dzlog_notice(str);
if(bDeleteFileName)
delete cFileName;
}
end:
sprintf(errorInfo, "[线程退出]开始监控目录 ThreadWatcher PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
dzlog_notice("%s",errorInfo);
CloseHandle(dlg->kkConfig.hDir);
pthread_exit(errorInfo);
return 0;
}
//通过图片名称获取车牌等信息
long ParseVehicleFromPicture(KKConfig kkConfig, char *imagePath, VehicleInfo &vehicleInfo)
{
// 图片的命名为: 时间_车牌_号牌种类_车牌颜色_车辆速度.bmp
// 如: 2014-05-04_16.35.48_京N56Y22_02_2_0.bmp" "2014-05-04_16.35.48_京N56Y22_02_2_0_moreInfo.bmp"
//根据图片名称获取图片的时间信息, FORMAT 日期时间_车牌_车牌颜色_车速_违章类型 eg: 2014-1-4_15.02.18_location.jpg
char fileName[256]={0};
long lTime=0;
sprintf( fileName, "%s", (strrchr(imagePath,'\\')+1) );
//fileName 0x0aa8f6f8 "20140504162515_京N56Y22_02_2_0.bmp" char [256]
char *p = strstr(fileName, "_");
char *pre=0;
char DateTime[32]={0};
char *pDateTime = DateTime;
try{
// Date
memcpy( pDateTime, fileName, p-fileName);
pDateTime += (p-fileName);
*pDateTime = ' ';
pDateTime ++;
//Time
pre = p+1;
p = strstr(pre, "_");
memcpy( pDateTime, pre, p-pre);
pre = pDateTime;
while( *pre!='\0'){
if( *pre == '.')
*pre = ':';
pre++;
}
}catch(...){
dzlog_error("解析[日期时间失败:%s]", fileName);
}
//plate
char plate[32]={0};
try{
pre = p+1;
p = strstr(pre, "_");
memcpy(plate, pre, p-pre);
}catch(...){
dzlog_error("解析[号牌失败:%s]", fileName);
}
//hpzl 号牌种类
char hpzl[8]={0};
try{
pre = p+1;
p = strstr(pre, "_");
memcpy(hpzl, pre, p-pre);
}catch(...){
dzlog_error("解析[号牌种类失败:%s]", fileName);
}
//车牌颜色
char hpys[8]={0};
try{
pre = p+1;
p = strstr(pre, "_");
memcpy(hpys, pre, p-pre);
}catch(...){
dzlog_error("解析[车牌颜色失败:%s]", fileName);
}
// ==================================================
// VehicleInfo *vehicleInfo = new VehicleInfo();
vehicleInfo.cdh = 1;//车道号
sprintf(vehicleInfo.kkbh, kkConfig.id);//卡口编号
sprintf(vehicleInfo.fxlx, kkConfig.direction);//方向类型
sprintf(vehicleInfo.hphm, plate ); //车牌 无牌、未识别、无法识别均用半角“-”表示,其中无号牌要注意hpzl填41
sprintf(vehicleInfo.hpzl, hpzl );//号牌种类 参考GA24.7(如01-大型汽车,02-小型汽车,25-农机号牌,41-无号牌,42-假号牌,99-其他号牌),不能为空;
sprintf(vehicleInfo.hpys, hpys );//号牌颜色 0-白色,1-黄色,2-蓝色,3-黑色,4-绿色,9-其它颜色,不能为空
sprintf(vehicleInfo.gcsj, DateTime ); //过车时间,e.g: "2003-09-11 11:07:23"
vehicleInfo.clsd = 0;//车辆速度 最长3位,单位:公里/小时
vehicleInfo.clxs = kkConfig.cdSpeedLimit;//车辆限速 最长3位,单位:公里/小时
sprintf(vehicleInfo.wfdm, "1"); //违章类型 违章行为编码 参考GA408.1
vehicleInfo.cwkc = 0;//车外廓长 最长5位,以厘米为单位
sprintf(vehicleInfo.cllx, "K33");//车辆类型 参考GA24.4(K11-大型普通客车,K21-中型普通客车,K31-小型普通客车,K33-轿车,H11-重型普通客车,H21-中型普通客车,M22-轻便二轮摩托车)
sprintf(vehicleInfo.fzhphm, "-");//辅助号牌号码 无牌、未识别、无法识别均用半角“-”表示,其中无号牌要注意hpzl填41
sprintf(vehicleInfo.csys, "-");//车身颜色
sprintf(vehicleInfo.tplj,"%s/", kkConfig.httpPath);//通行图片路径 固定部分
sprintf(vehicleInfo.tp1, "%s",imagePath);//通行图片1 变化的部分
return 1;
}
//②处理监控到的数据
// 获取车牌等信息、检查数据库中是否存在该图片,若存在则继续处理下一个,若不存在则复制图片从ftp到http,写入数据库,删除ftp下的文件
void* ThreadProcessWatchedFiles(void* pParam)
{
dzlog_notice("[线程开启]处理监控到的数据 ThreadProcessWatchedFiles PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
Cwx_kk_upDlg * dlg = (Cwx_kk_upDlg*)pParam;
TCHAR errorInfo[TF_MAX_PATH_LEN]={0};
char *imagePath = 0;
char ftpFilePath[512]={0};
char ftpPlateFilePath[512]={0};
char httpFilePath[512]={0};
char httpPlateFilePath[512]={0};
bool bExist = false;
char temp[512]={0};
long lRet=0;
char errorValue[512]={0};
VehicleInfo vehicleInfo={0};
TFDB db;
while(true){
if(m_ExitCode==1)
goto end;
if(dlg->m_images.size()<1){
dlg->GetDlgItem( IDC_STATUS )->SetWindowText("空闲等待");
Sleep(10);
continue;
}
sprintf(temp, "正在处理文件队列,剩余文件数量: %d ", dlg->m_images.size() );
dlg->GetDlgItem( IDC_STATUS )->SetWindowText(temp);
if(dlg->m_images.empty())
continue;
imagePath = dlg->m_images.front();
dlg->m_images.pop();
if(imagePath==0)
continue;
if(strrstr(imagePath, "_plate.bmp")){
try{
delete imagePath;
}catch(...){ }
continue;
}
sprintf(ftpFilePath, "%s\\%s", dlg->kkConfig.ftpPath, imagePath);// 过车特写图FTP
sprintf(httpFilePath, "%s\\%s", dlg->kkConfig.httpPath, imagePath);// 过车特写图HTTP
sprintf(ftpPlateFilePath, "%s\\%s_plate.bmp", dlg->kkConfig.ftpPath, imagePath);// 车牌图片FTP
sprintf(httpPlateFilePath, "%s\\%s_plate.bmp", dlg->kkConfig.httpPath, imagePath);//车牌图片HTTP
//check is file
if( !FileUtil::FindFirstFileExists( ftpFilePath, FILE_ATTRIBUTE_DIRECTORY) )
{
//check can access
if(_access(ftpFilePath, R_OK) == 0){
lRet = ParseVehicleFromPicture(dlg->kkConfig, imagePath, vehicleInfo);//通过图片名称获取车牌等信息
if(lRet != 1)
continue;
bExist = db.CheckImageExist(imagePath);//检查文件是否已经分析过,不存在则写入
if( ! bExist){
lRet = db.Add(&vehicleInfo);//写入数据库
if( lRet == true)
{
CopyFile:
// 过车特写图
lRet = FileUtil::CopyFileEx(ftpFilePath, httpFilePath, true);
if(lRet==true){
dzlog_error("Copyed file from [%s] to [%s]", ftpFilePath, httpFilePath);
DeleteFile(ftpFilePath);//处理完成后删除FTP下的文件
}else{
dzlog_error("Copy file failed [GetLastError %s] from [%s] to [%s]", GetLastErrorInfo() , ftpFilePath, httpFilePath);
}
CopyPlateFile:
//车牌图片
lRet = FileUtil::CopyFileEx(ftpPlateFilePath, httpPlateFilePath, true);
if(lRet==true){
dzlog_error("Copyed file from [%s] to [%s]", ftpFilePath, httpFilePath);
DeleteFile(ftpPlateFilePath);//处理完成后删除FTP下的文件
}else{
dzlog_error("Copy file failed [GetLastError %s] from [%s] to [%s]", GetLastErrorInfo() , ftpFilePath, httpFilePath);
}
}
}else{
if(FileUtil::FindFirstFileExists( httpFilePath, false))
DeleteFile(ftpFilePath);//处理完成后删除FTP下的文件
else
goto CopyFile;
if(FileUtil::FindFirstFileExists( httpPlateFilePath, false))
DeleteFile(ftpPlateFilePath);//处理完成后删除FTP下的文件
else
goto CopyPlateFile;
}
// flock();
}else{
dzlog_error("cannot access file [%s] [GetLastError %s] ", ftpFilePath, GetLastErrorInfo());
}
}
try{
if(imagePath)
delete imagePath;
}catch(...){ }
Sleep(10);
}
end:
sprintf(errorInfo, "[线程退出]处理监控到的数据 ThreadProcessWatchedFiles PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
dzlog_notice("%s",errorInfo);
pthread_exit(errorInfo);
return 0;
}
//③上传数据
// 读取数据库,获取未上传的图片进行上传,如果http下的图片文件不存在,则读取ftp下的图片并进行上传
void* ThreadUpload(void* pParam)
{
dzlog_notice("[线程开启]上传数据 ThreadUpload PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
Cwx_kk_upDlg * dlg = (Cwx_kk_upDlg*)pParam;
TCHAR errorInfo[TF_MAX_PATH_LEN]={0};
queue<VehicleInfo*> listIn;
VehicleInfo *vehicleInfo = 0;
long lRet = 0;
int id = 0;
bool connected = false;
TFDB db;
while(true){
db.Query(listIn);// 查询未上传的数据
next:
if(m_ExitCode==1)
goto end;
if(listIn.empty()){
Sleep(100);
continue;
}
vehicleInfo = listIn.front();
listIn.pop();
if(vehicleInfo==0)
goto next;
connect:
if( !connected )
lRet = dlg->InitTrans(vehicleInfo);//初始化连接,注册车道
if( lRet == CONNECT_ERROR){
dzlog_error("网络连接失败") ;
connected = false;
Sleep(2000);
goto connect;
}
connected = true;
id = vehicleInfo->id;
lRet = dlg->UploadInfo(vehicleInfo);//上传数据
if(vehicleInfo)
delete vehicleInfo;
if( lRet == OK)
db.Uploaded( id );//上传成功
else{
//上传失败
if( lRet == FALIED) { //连接成功但是上传失败
dzlog_error("连接成功但是上传失败 @ id=%d ", id) ; // to do something
}else if( lRet == CONNECT_ERROR){
dzlog_error("网络连接失败 @ id=%d ", id) ;
connected = false;
Sleep(100);
goto connect; //重新连接
}
}
goto next;
Sleep(100);
}
end:
sprintf(errorInfo, "[线程退出]上传数据 ThreadUpload PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
dzlog_notice("%s",errorInfo);
pthread_exit(errorInfo);
return 0;
}
#define MAX_THREAD 10
pthread_t thread[MAX_THREAD]={0};
void* pth_join_ret[MAX_THREAD];
/* =========================================================================
* 余留文件处理线程
* 处理上次退出后FTP未处理的文件
* 若程序死掉了,自己重启后首先处理上次余留的FTP上传的文件
* 1.用FileUtil::ListFiles获取FTP目录下未处理的文件
* 2.放入list队列
===========================================================================*/
void* ThreadProcessLast(void *pParam)
{
dzlog_notice("[线程开启]余留文件处理线程 ThreadProcessLast PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
Cwx_kk_upDlg * dlg = (Cwx_kk_upDlg*)pParam;
TCHAR errorInfo[TF_MAX_PATH_LEN]={0};
list<char *> list;
char *imagePath = 0;
while(true){
if(m_ExitCode==1)
goto end;
FileUtil::ListFiles(dlg->kkConfig.ftpPath, NULL, list, dlg->kkConfig.fileExt, 1, false, true);
while( !list.empty()){
if(m_ExitCode==1)
goto end;
imagePath = list.front();
list.pop_front();
if(imagePath!=0 && dlg->m_images.size()<500)
dlg->m_images.push(imagePath);
}
Sleep(1000);
}
end:
sprintf(errorInfo, "[线程退出]余留文件处理线程 ThreadProcessLast PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
dzlog_notice("%s",errorInfo);
pthread_exit(errorInfo);
return 0;
}
//系统资源监控
void* ProcessMonitorThread(void *pParam)
{
start:
dzlog_notice("[线程开启]系统资源监控线程 ThreadProcessLast PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
Cwx_kk_upDlg * dlg = (Cwx_kk_upDlg*)pParam;
TCHAR errorInfo[TF_MAX_PATH_LEN]={0};
char temp[512]={0};
uint64_t mem=0;//内存使用
uint64_t vmem=0;//虚拟内存使用
int cpu =0;
int ret=0;
char tempsize[64]={0};
try
{
while( true )
{
if(m_ExitCode==1)
goto end;
cpu = get_cpu_usage();
ret = get_memory_usage( &mem, &vmem);
if( cpu>=0)
sprintf(temp, "CPU: %d%% ", cpu );
if( ret>=0){
sprintf(temp, "%s Memery: %s | %s ",temp, SizeFormat(mem, tempsize) , SizeFormat(vmem, tempsize) );
}
sprintf(errorInfo, "系统资源 ProcessMonitorThread [%s] %s ",temp, GetLastErrorInfo());
dzlog_info("%s", errorInfo);
dlg->GetDlgItem(ID_PROCESS_STATE)->SetWindowText(temp);
Sleep(1000);
}
}catch(...){
sprintf(errorInfo, "[线程退出]系统资源监控线程 ProcessMonitorThread PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
dzlog_notice("%s",errorInfo);
goto start;
}
end:
sprintf(errorInfo, "[线程退出]系统资源监控线程 ProcessMonitorThread PID : 0x%x GetLastError(%s)", pthread_self().p, GetLastErrorInfo());
dzlog_notice("%s",errorInfo);
pthread_exit(errorInfo);
return 0;
}
//开始工作线程
void * ThreadStartAll(void *pParam)
{
TCHAR errorInfo[TF_MAX_PATH_LEN]={0};
threadRuning = !threadRuning;
Cwx_kk_upDlg * dlg = (Cwx_kk_upDlg*)pParam;
CMenu *pMenu = AfxGetApp()->m_pMainWnd->GetMenu();
CMenu *pSubMenu = pMenu->GetSubMenu(2);
dzlog_notice("======================================================================");
if(threadRuning){
dzlog_notice("开始工作线程 StartWatch");
m_ExitCode = 0;
pthread_create(&thread[0], NULL, ThreadProcessLast, pParam);//余留文件处理线程
pthread_create(&thread[1], NULL, ThreadWatcher, pParam); //开启监控目录线程
pthread_create(&thread[2], NULL, ThreadProcessWatchedFiles, pParam); //开启处理监控到的数据处理线程
pthread_create(&thread[3], NULL, ThreadUpload, pParam); //开启数据上传线程
pthread_create(&thread[4], NULL, ProcessMonitorThread, pParam); //系统资源监控
//AfxBeginThread(ThreadWatcher, this);//开启监控目录线程
//AfxBeginThread(ThreadProcessWatchedFiles, this);//开启处理监控到的数据处理线程
//AfxBeginThread(ThreadUpload, this);//开启数据上传线程
pSubMenu->ModifyMenu(ID_START_WATCH, MF_BYCOMMAND ,ID_START_WATCH,"关闭监控");
}else{
dzlog_notice("关闭工作线程 OnStartWatch");
m_ExitCode = 1;
Sleep(500);
pthread_join( thread[0], &pth_join_ret[0]);
dzlog_notice("关闭线程 thread0[余留文件处理线程] retruns : %s ", pth_join_ret[0]);
pthread_join( thread[1], &pth_join_ret[1]);
dzlog_notice("关闭线程 thread1[监控目录线程] retruns : %s ", pth_join_ret[1]);
pthread_join( thread[2], &pth_join_ret[2]);
dzlog_notice("关闭线程 thread2[数据处理线程] retruns : %s ", pth_join_ret[2]);
pthread_join( thread[3], &pth_join_ret[3]);
dzlog_notice("关闭线程 thread3[数据上传线程] retruns : %s ", pth_join_ret[3]);
pthread_join( thread[4], &pth_join_ret[4]);
dzlog_notice("关闭线程 thread4[系统资源监控线程] retruns : %s ", pth_join_ret[4]);
//
pSubMenu->ModifyMenu(ID_START_WATCH, MF_BYCOMMAND ,ID_START_WATCH,"开启监控");
}
sprintf(errorInfo, "[线程退出] 开始工作线程 ThreadStartAll PID : 0x%x ", pthread_self() );
pthread_exit(errorInfo);
return 0;
}
//开始监控目录
void Cwx_kk_upDlg::OnStartWatch()
{
//
if( !threadRuning){
FileUtil::CreateFolders(kkConfig.ftpPath);
FileUtil::CreateFolders(kkConfig.httpPath);
kkConfig.hDir = CreateFile( kkConfig.ftpPath ,
FILE_LIST_DIRECTORY,
FILE_SHARE_READ |
FILE_SHARE_WRITE |
FILE_SHARE_DELETE,
NULL,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
NULL);
if( kkConfig.hDir == INVALID_HANDLE_VALUE )
{
sprintf(errorInfo, "打开监控目录失败[%s][%s]",kkConfig.ftpPath, GetLastErrorInfo() );
dzlog_error(errorInfo);
MessageBox(errorInfo);
return ;
}
}
pthread_t thread;
pthread_create(&thread, NULL, ThreadStartAll, this); //开启工作线程
}
更多推荐
文件监控(二) 代码
发布评论