Step By Step
1、創建ES實例和LogStash實例
2、為LogStash配置SNAT
LogStash默認僅有VPC內網環境,如果想使用公網的數據源,需要配置NAT網關,本示例是從公網MySQL讀取數據,所以需要配置SNAT。
3、將SNAT IP加入MySQL網絡白名單
4、數據庫基本信息獲取
5、數據庫中建表和插入數據
DROP TABLE IF EXISTS `doctor_advisory_price_1`;
CREATE TABLE `doctor_advisory_price_1` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`doctor_id` bigint(20) NOT NULL COMMENT '醫生ID',
`advisory_price` int(10) NOT NULL COMMENT '諮詢價格:分',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`gmt_modify` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=304 DEFAULT CHARSET=utf8 COMMENT='醫生諮詢自定義價格表';
-- ----------------------------
-- Records of doctor_advisory_price
-- ----------------------------
INSERT INTO `doctor_advisory_price_1` VALUES (1, 123456, 2000, '2018-09-05 16:34:09', '2018-09-05 16:37:44');
INSERT INTO `doctor_advisory_price_1` VALUES (2, 1823784, 100, '2018-09-11 11:25:34', '2019-07-02 15:44:24');
INSERT INTO `doctor_advisory_price_1` VALUES (3, 1000247, 0, '2018-09-11 11:41:31', '2018-12-18 17:44:54');
INSERT INTO `doctor_advisory_price_1` VALUES (4, 44612299, 100, '2018-09-11 13:55:33', '2019-12-26 15:40:27');
INSERT INTO `doctor_advisory_price_1` VALUES (5, 44612298, 300, '2018-09-11 14:33:48', '2019-01-18 14:32:31');
INSERT INTO `doctor_advisory_price_1` VALUES (6, 61823709, 10000, '2018-09-11 16:28:57', '2018-09-11 16:28:57');
INSERT INTO `doctor_advisory_price_1` VALUES (7, 1899974, 10000, '2018-09-11 16:30:03', '2018-09-11 16:30:03');
INSERT INTO `doctor_advisory_price_1` VALUES (8, 61823711, 10000, '2018-09-11 17:16:07', '2018-09-11 17:16:07');
INSERT INTO `doctor_advisory_price_1` VALUES (9, 1610524, 0, '2018-09-11 17:31:50', '2019-08-22 14:16:45');
INSERT INTO `doctor_advisory_price_1` VALUES (10, 61823712, 2500, '2018-09-11 17:32:51', '2018-09-12 11:29:54');
INSERT INTO `doctor_advisory_price_1` VALUES (11, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (12, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (13, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (14, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (15, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (16, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (17, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (18, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (19, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (20, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (21, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (22, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (23, 32321043, 10000, '2018-09-13 18:20:19', '2018-09-13 18:20:19');
INSERT INTO `doctor_advisory_price_1` VALUES (24, 62023722, 0, '2018-09-14 10:28:00', '2018-09-14 10:28:00');
INSERT INTO `doctor_advisory_price_1` VALUES (25, 49522775, 0, '2018-09-14 11:00:23', '2019-05-28 11:47:12');
INSERT INTO `doctor_advisory_price_1` VALUES (26, 50622828, 100, '2018-09-14 14:08:55', '2019-12-27 14:41:14');
INSERT INTO `doctor_advisory_price_1` VALUES (27, 31210890, 100, '2018-09-14 14:27:48', '2019-01-15 14:24:37');
INSERT INTO `doctor_advisory_price_1` VALUES (28, 45822396, 200, '2018-09-14 14:59:14', '2019-01-18 10:25:16');
INSERT INTO `doctor_advisory_price_1` VALUES (29, 47322576, 100, '2018-09-15 10:01:26', '2018-09-15 10:01:26');
INSERT INTO `doctor_advisory_price_1` VALUES (30, 50632833, 0, '2018-09-15 10:09:24', '2018-10-10 16:58:38');
6、LogStash上傳插件(mysql-connector-java-8.0.18.jar)
7、ElasticSearch開啟允許自動創建索引
8、pipeline
# input插件需要監聽Logstash進程所在節點的端口,請使用8000~9000範圍內的端口。
input {
jdbc {
jdbc_driver_library => "/ssd/1/share/<LogStash實例Id>/logstash/current/config/custom/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<******>.mysql.rds.aliyuncs.com:3306/******?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
jdbc_user => "******"
jdbc_password => "<密碼>"
schedule => "* * * * *"
statement => "SELECT * from doctor_advisory_price_1"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "gmt_modify"
last_run_metadata_path => "/ssd/1/<logstash id>/logstash/data/doctor_advisory_price-20201210.txt"
clean_run => false
}
}
filter {
}
output {
elasticsearch {
hosts => "http://******.elasticsearch.aliyuncs.com:9200"
user => "elastic"
password => "<密碼>"
index => "doctor_test_01"
document_id => "%{doctor_id}"
}
# 支持output中添加file_extend output配置,即可在管道部署完成後直接查看輸出結果,進行結果驗證與調試
# 請勿修改系統指定路徑,註釋或刪除file_extend output部分配置,可關閉配置調試。詳情見下方提示
# file_extend {
# path => "/ssd/1/<logstash id>/logstash/logs/debug/mysql_to_es3"
# }
}
9、Kibana 查看