用 Java 实现 Syslog 功能

编程入门 行业动态 更新时间:2024-10-24 00:31:23

用 Java 实现 Syslog <a href=https://www.elefans.com/category/jswz/34/1771378.html style=功能"/>

用 Java 实现 Syslog 功能

1、业务场景

用一个 Spring Boot 的项目去实现对管控设备的监控、日志收集等。同时需要将接收到的日志进行入库,每天存一张表,如device_log_20231026…


2、Syslog客户端(接收日志的服务器,即运行Java程序的服务器)

2.1 导包

Syslog需要用到的jar包:

<dependency><groupId>org.graylog2</groupId><artifactId>syslog4j</artifactId><version>0.9.61</version>
</dependency>

Mybatis Plus 需要的jar包:(因为笔者此处使用MP进行数据库的交互)

<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.2</version>
</dependency>

2.2 Syslog 接收端

数据库实体类 DeviceSyslog.java

package com.haitai.domain.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;import java.io.Serializable;
import java.util.Date;/*** 设备syslog** @Author xincheng.du* @Date 2023/10/24 13:50*/
@Data
@TableName("device_log")
public class DeviceSyslog implements Serializable {/*** 主键id*/@TableId(type = IdType.AUTO)private Long id;/*** 设备ip*/@TableField("ip")private String ip;/*** 日志信息*/@TableField("message")private String message;/*** 日志级别*/@TableField("level")private Integer level;/*** 日志时间*/@TableField("log_time")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private Date logTime;/*** 入库时间*/@TableField("create_time")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private Date createTime;}

DeviceSyslogMapper.java

package com.haitai.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.haitai.domain.entity.DeviceSyslog;/*** 设备syslog Mapper** @Author xincheng.du* @Date 2023/5/25 14:10*/
public interface DeviceSyslogMapper extends BaseMapper<DeviceSyslog> {
}

SyslogUdpUtil.java
此处的 IP 和 Port 填写你运行Java程序的 IP 和你需要开放的端口。因为我的接收端和发送端在同一个局域网,所以我此处是 192.168.110.28 ,端口我此处设置为 3780.

package com.haitai.syslog;import com.haitai.service.DeviceSyslogService;
import lombok.extern.slf4j.Slf4j;
import org.graylog2.syslog4j.SyslogConstants;
import org.graylog2.syslog4j.server.SyslogServer;
import org.graylog2.syslog4j.server.SyslogServerEventIF;
import org.graylog2.syslog4j.server.SyslogServerIF;
import org.graylog2.syslog4j.server.SyslogServerSessionlessEventHandlerIF;
import org.graylog2.syslog4j.server.impl.udp.UDPNetSyslogServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.SocketAddress;
import java.util.Objects;/*** Syslog 接收端** @Author xincheng.du* @Date 2023/10/25 15:13*/
@Slf4j
@Component
public class SyslogUdpUtil {@Value("${syslog.udp.ip}")private String ip;@Value("${syslog.udp.port}")private int port;@Resourceprivate DeviceSyslogService deviceSyslogService;/*** 项目启动时,启动一个Syslog服务端监听指定端口,接收设备的日志消息*/@PostConstructpublic void init() {new Thread(() -> {log.info("服务端({})监听设备syslog日志", ip + ":" + port);// 服务端SyslogServerIF serverInstance = SyslogServer.getInstance(SyslogConstants.UDP);UDPNetSyslogServerConfig serverConfig = (UDPNetSyslogServerConfig) serverInstance.getConfig();serverConfig.setHost(ip);serverConfig.setPort(port);// 防止数据过大被截取导致不完整serverConfig.setMaxMessageSize(SyslogConstants.SYSLOG_BUFFER_SIZE * 10);serverConfig.addEventHandler(new SyslogServerSessionlessEventHandlerIF() {@Overridepublic void event(SyslogServerIF syslogServerIF, SocketAddress socketAddress, SyslogServerEventIF syslogServerEventIF) {if (Objects.nonNull(syslogServerEventIF)) {deviceSyslogService.saveSyslog(socketAddress, syslogServerEventIF);}}@Overridepublic void exception(SyslogServerIF syslogServerIF, SocketAddress socketAddress, Exception e) {}@Overridepublic void initialize(SyslogServerIF syslogServerIF) {}@Overridepublic void destroy(SyslogServerIF syslogServerIF) {}});// 启动服务端serverInstance.run();}).start();}}

DeviceSyslogService.java

package com.haitai.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.haitai.domain.entity.DeviceSyslog;
import com.haitai.domain.param.syslog.DeviceSyslogQueryParam;
import com.haitai.domain.vo.TableDataInfo;
import org.graylog2.syslog4j.server.SyslogServerEventIF;import java.SocketAddress;/*** 设备syslog 相关接口** @Author xincheng.du* @Date 2023/10/25 11:06*/
public interface DeviceSyslogService extends IService<DeviceSyslog> {/*** 日志分页** @param param 参数* @return      分页列表*/TableDataInfo pageSyslog(DeviceSyslogQueryParam param);/*** 日志入库** @param socketAddress         socket地址* @param syslogServerEventIF   syslog事件拓展接口*/void saveSyslog(SocketAddress socketAddress, SyslogServerEventIF syslogServerEventIF);/*** 创建日志表** @param tableName 表名*/void createTable(String tableName);/*** 判断表是否存在** @param tableName 表名*/boolean existTable(String tableName);}

DeviceSyslogServiceImpl.java
其中 Constants.DEVICE_LOG_BASE_TABLE_NAME = “device_log”

package com.haitai.service.impl;import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.haitai.config.mybatis.RequestDataHelper;
import com.haitai.constant.Constants;
import com.haitai.constant.HttpStatus;
import com.haitai.domain.entity.DeviceSyslog;
import com.haitai.domain.param.syslog.DeviceSyslogQueryParam;
import com.haitai.domain.vo.TableDataInfo;
import com.haitai.mapper.DeviceSyslogMapper;
import com.haitai.service.DeviceSyslogService;
import com.haitai.utils.LocalCache;
import lombok.extern.slf4j.Slf4j;
import org.graylog2.syslog4j.server.SyslogServerEventIF;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.InetSocketAddress;
import java.SocketAddress;
import java.sql.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Objects;/*** 设备syslog 相关逻辑处理** @Author xincheng.du* @Date 2023/10/24 14:18*/
@Slf4j
@Service
public class DeviceSyslogServiceImpl extends ServiceImpl<DeviceSyslogMapper, DeviceSyslog> implements DeviceSyslogService {/*** 数据库用户名*/@Value("${spring.datasource.druid.master.username}")private String username;/*** 数据库密码*/@Value("${spring.datasource.druid.master.password}")private String password;/*** 数据库url*/@Value("${spring.datasource.druid.master.url}")private String url;@Overridepublic TableDataInfo pageSyslog(DeviceSyslogQueryParam param) {HashMap<String, Object> map = new HashMap<>();map.put("date", Objects.nonNull(param.getLogTime()) ? param.getLogTime() : new Date());RequestDataHelper.setRequestData(map);LambdaQueryWrapper<DeviceSyslog> deviceLambdaQueryWrapper = new LambdaQueryWrapper<>();deviceLambdaQueryWrapper.eq(Objects.nonNull(param.getId()), DeviceSyslog::getId, param.getId()).like(Objects.nonNull(param.getIp()), DeviceSyslog::getIp, param.getIp()).like(Objects.nonNull(param.getMessage()), DeviceSyslog::getMessage, param.getMessage()).eq(Objects.nonNull(param.getLevel()), DeviceSyslog::getLevel, param.getLevel())// 根据创建时间倒序排列.orderByDesc(DeviceSyslog::getCreateTime);Page<DeviceSyslog> page = new Page<>(param.getPage().longValue(), param.getLimit().longValue());Page<DeviceSyslog> result = baseMapper.selectPage(page, deviceLambdaQueryWrapper);TableDataInfo rspData = new TableDataInfo();rspData.setCode(HttpStatus.SUCCESS);rspData.setMsg("查询成功");rspData.setRows(result.getRecords());rspData.setTotal(result.getTotal());return rspData;}@Override@Transactional(rollbackFor = Exception.class)public void saveSyslog(SocketAddress socketAddress, SyslogServerEventIF syslogServerEventIF) {java.util.Date now = new Date();String yyyyMMdd = DateUtil.format(now, "yyyyMMdd");String tableName = Constants.DEVICE_LOG_BASE_TABLE_NAME + "_" + yyyyMMdd;// 表不存在先创建if (!existTable(tableName)) {createTable(tableName);}DeviceSyslog deviceSyslog = new DeviceSyslog();if (socketAddress instanceof InetSocketAddress) {InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;deviceSyslog.setIp(inetSocketAddress.getHostName());}deviceSyslog.setLevel(syslogServerEventIF.getLevel());deviceSyslog.setMessage(syslogServerEventIF.getMessage());deviceSyslog.setLogTime(syslogServerEventIF.getDate());deviceSyslog.setCreateTime(now);save(deviceSyslog);}@Overridepublic void createTable(String tableName) {try (Connection connection = DriverManager.getConnection(url, username, password);Statement statement = connection.createStatement()) {String createTableSQL = "CREATE TABLE " + tableName + " (" +"  `id` bigint NOT NULL AUTO_INCREMENT," +"  `ip` varchar(50) DEFAULT NULL," +"  `message` varchar(1000) DEFAULT NULL," +"  `level` int DEFAULT NULL," +"  `log_time` datetime DEFAULT NULL," +"  `create_time` datetime DEFAULT NULL," +"  PRIMARY KEY (`id`)" +") ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;";statement.execute(createTableSQL);log.info("系统日志表创建成功,表名:{}", tableName);LocalCache.set(tableName, Boolean.TRUE);} catch (Exception e) {LocalCache.set(tableName, Boolean.FALSE);log.error("系统日志表创建失败,表名:{}", tableName);e.printStackTrace();}}@Overridepublic boolean existTable(String tableName) {Object o = LocalCache.get(tableName);if (Objects.nonNull(o)) {return (Boolean) o;}try (Connection connection = DriverManager.getConnection(url, username, password)) {DatabaseMetaData databaseMetaData = connection.getMetaData();ResultSet resultSet = databaseMetaData.getTables(null, null, tableName, null);if (resultSet.next()) {LocalCache.set(tableName, Boolean.TRUE);return true;} else {LocalCache.set(tableName, Boolean.FALSE);return false;}} catch (Exception e) {log.error("判断表名:{} 是否存在出错,原因:{}", tableName, e.getMessage());e.printStackTrace();return false;}}}

缓存工具类 LocalCache.java

package com.haitai.utils;import cn.hutool.cache.CacheUtil;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.core.date.DateUnit;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** 本地缓存工具类** @Author xincheng.du* @Date 2023/7/4 13:52*/
@SuppressWarnings("unused")
public class LocalCache {private LocalCache() {}/*** 默认缓存时长*/private static final long DEFAULT_TIMEOUT = 5 *DateUnit.MINUTE.getMillis();/*** 默认清理间隔时间*/private static final long CLEAN_TIMEOUT = 5 * DateUnit.MINUTE.getMillis();/*** 缓存对象*/private static final TimedCache<String, Object> TIMED_CACHE = CacheUtil.newTimedCache(DEFAULT_TIMEOUT);static {// 启动定时任务TIMED_CACHE.schedulePrune(CLEAN_TIMEOUT);}/*** 存值** @param key   键* @param value 值*/public static void set(String key, Object value) {TIMED_CACHE.put(key, value);}/*** 设置缓存并设置过期时间** @param key       缓存key* @param value     缓存值* @param expire    过期时间,单位:ms*/public static void set(String key, Object value, long expire) {TIMED_CACHE.put(key, value, expire);}/*** 设置缓存并设置过期时间(自定义单位)** @param key       缓存key* @param value     缓存值* @param expire    过期时间,单位:ms* @param unit      过期时间单位,默认毫秒*/public static void set(String key, Object value, long expire, TimeUnit unit) {if (Objects.nonNull(unit)) {switch (unit) {case NANOSECONDS:TIMED_CACHE.put(key, value, expire / 1000000);break;case MICROSECONDS:TIMED_CACHE.put(key, value, expire / 1000);break;case SECONDS:TIMED_CACHE.put(key, value, expire * DateUnit.SECOND.getMillis());break;case MINUTES:TIMED_CACHE.put(key, value, expire * DateUnit.MINUTE.getMillis());break;case HOURS:TIMED_CACHE.put(key, value, expire * DateUnit.HOUR.getMillis());break;case DAYS:TIMED_CACHE.put(key, value, expire * DateUnit.DAY.getMillis());break;case MILLISECONDS:default:TIMED_CACHE.put(key, value, expire);break;}} else {TIMED_CACHE.put(key, value, expire);}}/*** 获取并重新计算过期时间** @param key   键* @return      值*/public static Object getWithUpdateLastAccess(String key) {return TIMED_CACHE.get(key);}/*** 取值** @param key   键* @return      值*/public static Object get(String key) {return TIMED_CACHE.get(key, false);}/*** 获取所有缓存的key** @return  key集合*/public static Set<String> keySet() {return TIMED_CACHE.keySet();}/*** 单个删除** @param key   键*/public static void remove(String key) {TIMED_CACHE.remove(key);}/*** 批量删除** @param keys  键集合*/public static void removeAll(List<String> keys) {for (String key : keys) {TIMED_CACHE.remove(key);}}/*** 获取所有key** @return  key集合*/public static Set<String> getAllKeys() {return TIMED_CACHE.keySet();}/*** 获取包含关键词的所有key** @return  key集合*/public static List<String> getContainsKeys(String keyword) {Set<String> allKeys = getAllKeys();List<String> list = new ArrayList<>();allKeys.forEach(item -> {if (item.contains(keyword)) {list.add(item);}});return list;}/*** 清空缓存*/public static void clear() {TIMED_CACHE.clear();}}

动态表名拦截器 DynamicDateTableNameHandler.java:

package com.haitai.config.mybatis;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.extension.plugins.handler.TableNameHandler;
import com.haitai.constant.Constants;
import lombok.NoArgsConstructor;
import org.apachemons.lang3.StringUtils;import java.util.Date;
import java.util.Map;
import java.util.Objects;/*** 动态日期表名替换处理器** @Author xincheng.du* @Date 2023/10/25 10:25*/
@NoArgsConstructor
public class DynamicDateTableNameHandler implements TableNameHandler {@Overridepublic String dynamicTableName(String sql, String tableName) {if (StringUtils.isNotBlank(tableName) && Constants.DEVICE_LOG_BASE_TABLE_NAME.equals(tableName)) {Date date;Map<String, Object> paramMap = RequestDataHelper.getRequestData();if (CollUtil.isNotEmpty(paramMap) && Objects.nonNull(paramMap.get("date"))) {date = (Date) paramMap.get("date");} else {// 为空日期取当天date = new Date();}String tableNameSuffix = "_" + DateUtil.format(date, "yyyyMMdd");return Constants.DEVICE_LOG_BASE_TABLE_NAME + tableNameSuffix;}return tableName;}}

将动态表名拦截器添加到MP的拦截器链中 MybatisPlusConfig.java :

package com.haitai.config.mybatis;import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.DynamicTableNameInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author dideng.zhang* @version 1.0* @date 2023/5/19 14:29*/
@Configuration
public class MybatisPlusConfig {@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor() {MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();PaginationInnerInterceptor paginationInnerInterceptor = new PaginationInnerInterceptor();paginationInnerInterceptor.setDbType(DbType.MYSQL);interceptor.addInnerInterceptor(paginationInnerInterceptor);// 配置动态表名拦截器DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor = new DynamicTableNameInnerInterceptor();DynamicDateTableNameHandler dynamicDateTableNameHandler = new DynamicDateTableNameHandler();dynamicTableNameInnerInterceptor.setTableNameHandler(dynamicDateTableNameHandler);interceptor.addInnerInterceptor(dynamicTableNameInnerInterceptor);return interceptor;}
}

请求参数传递辅助类

package com.haitai.config.mybatis;import cn.hutool.core.collection.CollUtil;import java.util.Map;/*** 请求参数传递辅助类*/
public class RequestDataHelper {private RequestDataHelper() {}/*** 请求参数存取*/private static final ThreadLocal<Map<String, Object>> REQUEST_DATA = new ThreadLocal<>();/*** 设置请求参数** @param requestData 请求参数 MAP 对象*/public static void setRequestData(Map<String, Object> requestData) {REQUEST_DATA.set(requestData);}/*** 获取请求参数** @param param 请求参数* @return 请求参数 MAP 对象*/public static <T> T getRequestData(String param) {Map<String, Object> dataMap = getRequestData();if (CollUtil.isNotEmpty(dataMap)) {return (T) dataMap.get(param);}return null;}/*** 获取请求参数** @return 请求参数 MAP 对象*/public static Map<String, Object> getRequestData() {return REQUEST_DATA.get();}
}

3、Syslog服务端(发送日志的服务器)

3.1 rsyslog.conf配置修改

(1)打开服务器的 /etc/rsyslog.conf 配置文件:

vi /etc/rsyslog.conf

(2)在文件末尾添加:

*.* @192.168.110.28:3780

此处的 ip 和 port 和你上述代码中的对应即可,同时还要保证这两台设备能相互访问。这里的一个“@”代表使用UDP通信,两个“@”代表使用TCP通信。
(3)保存并关闭文件,先按 esc,然后输入:

:wq

(4)防火墙开放3780端口:

firewall-cmd --permanent --add-port=3780/udp

(5)重置防火墙

firewall-cmd --reload

(6)重启 syslog 服务:

systemctl restart rsyslog

4. 测试

启动 Java 项目,过一会儿,可以看到我们的表已经创建成功了,同时已经有数据入库。

更多推荐

用 Java 实现 Syslog 功能

本文发布于:2023-11-17 12:35:41,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1643694.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:功能   Java   Syslog

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!