flink定时器实现订单自动好评

编程入门 行业动态 更新时间:2024-10-28 05:17:29

flink<a href=https://www.elefans.com/category/jswz/34/1768257.html style=定时器实现订单自动好评"/>

flink定时器实现订单自动好评

正常开发环境数据流转为

数据库 --通过canal实时采集--> kafka --> flink实时计算 --> 数据库

本文为了测试方便,将数据流转更改为

数据库 --> flink实时计算 --> 数据库

1.测试数据

-- ----------------------------
-- Table structure for torder
-- ----------------------------
DROP TABLE IF EXISTS `torder`;
CREATE TABLE `torder`  (`orderid` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`ordertime` bigint(20) DEFAULT NULL,`orderreview` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,`orderstate` int(8) DEFAULT NULL,PRIMARY KEY (`orderid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Compact;-- ----------------------------
-- Records of torder
-- ----------------------------
INSERT INTO `torder` VALUES ('018ffeb2-7ebe-4dad-b29e-add4e79260ed', 1611626060628, '无', 0);
INSERT INTO `torder` VALUES ('02bd0f7c-4cac-4b91-be7e-6eb6f3fec969', 1611626074635, '无', 0);
INSERT INTO `torder` VALUES ('03011113-25a3-4956-acd9-1d3630c4777e', 1611626057627, '无', 0);
INSERT INTO `torder` VALUES ('0408d88f-3254-42c8-9079-f9dabfe98ca6', 1611626070632, '无', 0);
INSERT INTO `torder` VALUES ('05de50bf-12c2-4887-ae6b-bbde3085d0d9', 1611626113656, '无', 0);
INSERT INTO `torder` VALUES ('064b5cf7-6c7b-4822-8012-a653d3e5c79c', 1611626099649, '无', 0);
INSERT INTO `torder` VALUES ('09d9e8aa-276f-4ba1-aea3-fd80042f0b5a', 1611626103651, '无', 0);
INSERT INTO `torder` VALUES ('0a838c02-ba03-406a-9c40-d0d53895781b', 1611626058628, '无', 0);
INSERT INTO `torder` VALUES ('0be872f8-01fc-4d7c-9682-95e38fad0308', 1611626053624, '无', 0);
INSERT INTO `torder` VALUES ('0cb1f712-6691-4d16-89f8-dd3ba5d7ed4d', 1611626081639, '无', 0);
INSERT INTO `torder` VALUES ('10049263-1cb2-45d5-a887-6289aa9cd706', 1611626090643, '无', 0);
INSERT INTO `torder` VALUES ('10b6e6a0-eefb-492f-8e17-550d3623be0f', 1611626111655, '无', 0);
INSERT INTO `torder` VALUES ('1101393f-0000-476e-b875-de15c91e3c99', 1611626080638, '无', 0);
INSERT INTO `torder` VALUES ('114d15cf-75e5-4925-8c95-943ccf9f748c', 1611626115657, '无', 0);
INSERT INTO `torder` VALUES ('126eaaec-e49a-40a5-9755-3937ef45f3f9', 1611626094647, '无', 0);
INSERT INTO `torder` VALUES ('12e7bd68-d2d1-43a4-891b-70611d88bb68', 1611626062628, '无', 0);
INSERT INTO `torder` VALUES ('13df9483-e6d2-488f-9a4c-7b945723845f', 1611626111655, '无', 0);
INSERT INTO `torder` VALUES ('14435e3e-1b5d-43a3-8705-97304f2a8d80', 1611626058628, '无', 0);
INSERT INTO `torder` VALUES ('157c680c-40fc-4e9f-80a3-a7dc3864a387', 1611626072634, '无', 0);
INSERT INTO `torder` VALUES ('16782d32-7f48-44f6-9666-ab5316508d86', 1611626067631, '无', 0);
INSERT INTO `torder` VALUES ('1909c1c1-aac7-4b61-86c5-5cd4ed913ff9', 1611626081639, '无', 0);
INSERT INTO `torder` VALUES ('19283a3c-57c2-4dc9-91a1-33c0699a2db2', 1611626079638, '无', 0);
INSERT INTO `torder` VALUES ('1a2338a5-c014-497e-a6df-1470ae4c8b31', 1611626108653, '无', 0);
INSERT INTO `torder` VALUES ('1b1d9a8c-fe4b-4e61-a20c-5f16fd8fc404', 1611626077637, '无', 0);
INSERT INTO `torder` VALUES ('1b888529-ced3-44ec-9d02-fdd4ea7b08f2', 1611626095647, '无', 0);
INSERT INTO `torder` VALUES ('1ce74e17-b83f-478f-a476-7664aa262740', 1611626098648, '无', 0);
INSERT INTO `torder` VALUES ('1d111238-ec0d-486c-b8bc-e3329a2f17b7', 1611626106653, '无', 0);
INSERT INTO `torder` VALUES ('1f698c02-e463-4e4f-b4c0-6d837c110012', 1611626092645, '无', 0);
INSERT INTO `torder` VALUES ('2072424d-9a0c-4482-9ba9-35feaa6fef73', 1611626086642, '无', 0);
INSERT INTO `torder` VALUES ('20bc0e69-3209-472e-a963-9e367dd35207', 1611626080638, '无', 0);
INSERT INTO `torder` VALUES ('21510d73-410e-40bc-a335-294dad9cd475', 1611626092646, '无', 0);
INSERT INTO `torder` VALUES ('22768d54-a752-4171-acc5-71595955eb08', 1611626091644, '无', 0);
INSERT INTO `torder` VALUES ('22c2e658-e1f8-49ed-a43c-51857ef4a679', 1611626070632, '无', 0);
INSERT INTO `torder` VALUES ('24b96afa-c196-47f0-8e7c-07feeb7afeeb', 1611626108653, '无', 0);
INSERT INTO `torder` VALUES ('26888908-17c7-4b1b-8303-c3ade7ba40a9', 1611626097648, '无', 0);
INSERT INTO `torder` VALUES ('272f8beb-20e2-4227-8f4d-554325854972', 1611626054625, '无', 0);
INSERT INTO `torder` VALUES ('27546f7e-617f-4033-8e11-bfd955d889e9', 1611626099649, '无', 0);
INSERT INTO `torder` VALUES ('28e663d2-f230-4f0e-98da-ed9e0269b8f6', 1611626078637, '无', 0);
INSERT INTO `torder` VALUES ('2980bef1-b94b-43ef-a549-b4bb1084e742', 1611626072634, '无', 0);
INSERT INTO `torder` VALUES ('2a57c72f-124d-4c11-ac1b-1a0942285061', 1611626103651, '无', 0);
INSERT INTO `torder` VALUES ('2db33049-b012-4750-ad58-6dafb55e98ae', 1611626074635, '无', 0);
INSERT INTO `torder` VALUES ('2f5ceb8b-4323-4d38-a44e-6982480b8c45', 1611626091645, '无', 0);
INSERT INTO `torder` VALUES ('2fb4119c-735a-42c1-ae5f-09d65832a469', 1611626072634, '无', 0);
INSERT INTO `torder` VALUES ('321e8fb1-2435-4fc1-bbee-2937e89753d1', 1611626112655, '无', 0);
INSERT INTO `torder` VALUES ('336e16e8-f381-494a-993a-27ded5aa607c', 1611626060628, '无', 0);
INSERT INTO `torder` VALUES ('369618a4-5bb2-4770-aaff-b0df0baf3a52', 1611626112655, '无', 0);
INSERT INTO `torder` VALUES ('37f8c062-2d82-49af-9cfe-5e03de93191f', 1611626056626, '无', 0);
INSERT INTO `torder` VALUES ('3952a7a7-8eb5-48e6-9d9a-1a2c615ad8b0', 1611626050623, '无', 0);
INSERT INTO `torder` VALUES ('397e8387-49be-4215-8128-b813a7b1f430', 1611626109654, '无', 0);
INSERT INTO `torder` VALUES ('3afb23fa-0e51-4ab8-b994-94f2df6f3f15', 1611626105652, '无', 0);
INSERT INTO `torder` VALUES ('3b1a6415-150f-4fd0-90ad-2a6ca5a43ee7', 1611626084640, '无', 0);
INSERT INTO `torder` VALUES ('3b432330-053e-45b9-9e9c-71bfd1249f2c', 1611626069632, '无', 0);
INSERT INTO `torder` VALUES ('3b84a997-89c7-4c13-aa9e-bf0437127180', 1611626104651, '无', 0);
INSERT INTO `torder` VALUES ('3f5da148-d8f6-44d9-aa70-adc46c7bf2e4', 1611626071633, '无', 0);
INSERT INTO `torder` VALUES ('40f23c7b-95a5-4603-ab03-a576aa6fff1a', 1611626068631, '无', 0);
INSERT INTO `torder` VALUES ('41dab6d2-c8b6-474c-bb2e-b1694b4d32c8', 1611626085641, '无', 0);
INSERT INTO `torder` VALUES ('42d4a356-5340-4ccf-a748-ceb2f7a79f81', 1611626058628, '无', 0);
INSERT INTO `torder` VALUES ('438b2f39-8bb0-4cc7-9810-f688a7a23775', 1611626101650, '无', 0);
INSERT INTO `torder` VALUES ('44a60fe6-4625-41c4-8690-068134f01481', 1611626059628, '无', 0);
INSERT INTO `torder` VALUES ('45fe803d-f0f7-47ee-b851-40d679c4b1ec', 1611626115657, '无', 0);
INSERT INTO `torder` VALUES ('478f8bd3-39bf-42a2-8aca-e47844fde1b9', 1611626061628, '无', 0);
INSERT INTO `torder` VALUES ('49fbabf8-ec64-4c96-b5e6-affbe4b3a7c4', 1611626074635, '无', 0);
INSERT INTO `torder` VALUES ('4b4b4471-1e67-4f0a-88eb-5200c9bdb97c', 1611626056626, '无', 0);
INSERT INTO `torder` VALUES ('4dc065e9-0cb9-4356-9847-1fb652dc6907', 1611626082640, '无', 0);
INSERT INTO `torder` VALUES ('4e0ba020-fcff-47cb-8b12-fb1ceddeaa74', 1611626070632, '无', 0);
INSERT INTO `torder` VALUES ('4ee933a7-c420-4592-a102-5bf4c3b1564d', 1611626065630, '无', 0);
INSERT INTO `torder` VALUES ('4f3ce3ef-d613-45c6-a28c-cb27c5d7f63f', 1611626098648, '无', 0);
INSERT INTO `torder` VALUES ('5067590e-0852-4c9c-85f0-09fb78062751', 1611626112655, '无', 0);
INSERT INTO `torder` VALUES ('5199ce7b-aee7-4c8c-9f5e-243b3358b6a4', 1611626052623, '无', 0);
INSERT INTO `torder` VALUES ('52c06133-b90e-448b-b0b3-0cc4cf35f0c1', 1611626051623, '无', 0);
INSERT INTO `torder` VALUES ('538b10c9-1650-4c25-ae87-bfb7aa1965e6', 1611626105652, '无', 0);
INSERT INTO `torder` VALUES ('5551abb4-12e1-4413-8e18-a3b26fcac91a', 1611626086642, '无', 0);
INSERT INTO `torder` VALUES ('59819a27-4ab2-4d36-a212-f7e1b4cafadb', 1611626083640, '无', 0);
INSERT INTO `torder` VALUES ('5ac6867f-4805-4e22-ba99-015d7909c6c7', 1611626107653, '无', 0);
INSERT INTO `torder` VALUES ('5b81d266-b268-4a1d-a63d-802d3225c5a0', 1611626073635, '无', 0);
INSERT INTO `torder` VALUES ('5db888a4-b23a-4785-886a-2e9a7be9df6f', 1611626057627, '无', 0);
INSERT INTO `torder` VALUES ('5e3713e6-fc1d-4fda-a15f-375129398104', 1611626081639, '无', 0);
INSERT INTO `torder` VALUES ('62578f3a-366c-4b16-bda7-8920aac73b9b', 1611626116658, '无', 0);
INSERT INTO `torder` VALUES ('663e6878-810f-40d1-8b0d-224c9d4b044a', 1611626106653, '无', 0);
INSERT INTO `torder` VALUES ('667c3f44-c6ef-49d0-ba52-b8b181931d1b', 1611626052623, '无', 0);
INSERT INTO `torder` VALUES ('66a9969a-f2f2-4359-a368-07c9dcc74c87', 1611626057627, '无', 0);
INSERT INTO `torder` VALUES ('66aab434-b3ce-4d3e-a8f2-7f4d6cddac10', 1611626055626, '无', 0);
INSERT INTO `torder` VALUES ('6872344b-3fc3-418b-b703-2acf58485ecb', 1611626093646, '无', 0);
INSERT INTO `torder` VALUES ('69da3582-b7eb-4358-87b5-3d4f98aeb7fc', 1611626090644, '无', 0);
INSERT INTO `torder` VALUES ('6b66a8cc-2d8a-4092-9233-c9883698860b', 1611626083640, '无', 0);
INSERT INTO `torder` VALUES ('6b6ede0c-ca0d-4547-8931-f34c204edbaf', 1611626075635, '无', 0);
INSERT INTO `torder` VALUES ('6ce458a3-bd99-4a2c-885a-e98f5b4ffced', 1611626107653, '无', 0);
INSERT INTO `torder` VALUES ('6d4e4459-c34e-4334-8252-596bd389c23e', 1611626105652, '无', 0);
INSERT INTO `torder` VALUES ('6f971545-a35a-4bcb-a2ac-1ee095598425', 1611626089644, '无', 0);
INSERT INTO `torder` VALUES ('7114f869-9d96-4405-9c49-b572eddfe308', 1611626104651, '无', 0);
INSERT INTO `torder` VALUES ('7263ae49-d3a4-4f2b-8ddc-25aa85c48fa3', 1611626090644, '无', 0);
INSERT INTO `torder` VALUES ('738a0186-f7f2-4e0d-b453-41e697fc5f42', 1611626076636, '无', 0);
INSERT INTO `torder` VALUES ('77bc1314-804e-496b-9113-ee2d8a82b054', 1611626097648, '无', 0);
INSERT INTO `torder` VALUES ('782c1b12-cc55-4f22-bfe3-4f06e07a3656', 1611626065630, '无', 0);
INSERT INTO `torder` VALUES ('799219d1-77f3-4b01-9ab8-5d27b88e13ff', 1611626087643, '无', 0);
INSERT INTO `torder` VALUES ('7a35244a-6edc-48c3-9640-c563effdbac2', 1611626089643, '无', 0);
INSERT INTO `torder` VALUES ('7b1f60c9-7037-4b73-8992-e6c5b918f98f', 1611626114656, '无', 0);
INSERT INTO `torder` VALUES ('7dba73b0-a7b3-4916-911b-495d7f478246', 1611626066631, '无', 0);
INSERT INTO `torder` VALUES ('7f1210bd-ca26-43bb-b131-19185baa5476', 1611626109654, '无', 0);
INSERT INTO `torder` VALUES ('7f93137f-a195-40df-9391-1a62e1f6aa45', 1611626111655, '无', 0);
INSERT INTO `torder` VALUES ('8134494c-d1a8-4db6-941e-fa02f695818c', 1611626110654, '无', 0);
INSERT INTO `torder` VALUES ('813e1c34-cc03-4e9c-bc13-59c849ed50c3', 1611626089644, '无', 0);
INSERT INTO `torder` VALUES ('8143ebc5-ffc9-4021-b999-8e6975115903', 1611626114656, '无', 0);
INSERT INTO `torder` VALUES ('8282f23f-1b24-4638-9e80-5af45ae3e404', 1611626116658, '无', 0);
INSERT INTO `torder` VALUES ('838cc3c0-6b68-4620-919b-61e7bb98cf3f', 1611626100649, '无', 0);
INSERT INTO `torder` VALUES ('84fe4cfc-153d-497e-904f-0b17578174dc', 1611626087642, '无', 0);
INSERT INTO `torder` VALUES ('881eed4a-b08f-4b34-9f28-093aae146af5', 1611626064630, '无', 0);
INSERT INTO `torder` VALUES ('881fa7e5-0aed-4598-8bae-ee95e96cd368', 1611626116658, '无', 0);
INSERT INTO `torder` VALUES ('89f54007-cb24-4f7d-ad2b-122e1e9f9946', 1611626064630, '无', 0);
INSERT INTO `torder` VALUES ('8b5f0684-0714-43d7-85ee-b64c1bb4dfd3', 1611626055626, '无', 0);
INSERT INTO `torder` VALUES ('8e8b058e-2425-4b45-8a38-03bfaf08a8db', 1611626077637, '无', 0);
INSERT INTO `torder` VALUES ('91467427-d5d0-4998-9f70-0d97b229e759', 1611626094646, '无', 0);
INSERT INTO `torder` VALUES ('9260709e-55fe-4893-bc0a-923e7edcd796', 1611626109654, '无', 0);
INSERT INTO `torder` VALUES ('93647b3d-84bd-4439-8ad0-658bbdae9f62', 1611626091645, '无', 0);
INSERT INTO `torder` VALUES ('93ce8e43-70d9-4000-8360-15f795c9340e', 1611626101650, '无', 0);
INSERT INTO `torder` VALUES ('99d372a3-c9b4-44ee-81c0-3c536c26754c', 1611626061628, '无', 0);
INSERT INTO `torder` VALUES ('99debad1-0e80-41ff-8f7e-c41dc794beba', 1611626098648, '无', 0);
INSERT INTO `torder` VALUES ('a03084dd-3509-4d85-8adc-536c277dd2ad', 1611626101650, '无', 0);
INSERT INTO `torder` VALUES ('a11c1ce1-3d13-46c9-9a0f-7665d3e8796e', 1611626066631, '无', 0);
INSERT INTO `torder` VALUES ('a2284585-41c5-4f5d-a1ce-14a2256d402b', 1611626065630, '无', 0);
INSERT INTO `torder` VALUES ('a2e612b4-b07e-4775-9d51-fa79aedf4b7b', 1611626071633, '无', 0);
INSERT INTO `torder` VALUES ('a6ac6d7f-90ea-420a-b010-a12969100b99', 1611626086642, '无', 0);
INSERT INTO `torder` VALUES ('a933e7d1-c28c-4497-b132-6546c1a5c72d', 1611626083640, '无', 0);
INSERT INTO `torder` VALUES ('aae8ae85-29bc-4b00-9d9c-42007563b5d0', 1611626096647, '无', 0);
INSERT INTO `torder` VALUES ('ab5a4a85-d79f-4eb6-bfd4-9946fad1306f', 1611626076636, '无', 0);
INSERT INTO `torder` VALUES ('ab77ee7d-cbee-460c-8f4c-eac874ec8931', 1611626051623, '无', 0);
INSERT INTO `torder` VALUES ('ac49c6a6-c135-4a2f-95c1-205363f81bd1', 1611626093647, '无', 0);
INSERT INTO `torder` VALUES ('ac6357f8-8f67-48e4-97ed-781af0c777fa', 1611626092646, '无', 0);
INSERT INTO `torder` VALUES ('acf75a31-ba47-43cb-8293-2cc47d83869f', 1611626079638, '无', 0);
INSERT INTO `torder` VALUES ('aea02638-58a5-4c2b-8853-566dc3ecf890', 1611626077637, '无', 0);
INSERT INTO `torder` VALUES ('b0634612-7cba-4a4c-a153-9515a28c04b8', 1611626113656, '无', 0);
INSERT INTO `torder` VALUES ('b19d27f9-264b-4252-ba03-0b4b6f8d6471', 1611626102650, '无', 0);
INSERT INTO `torder` VALUES ('b382706b-062b-4dab-9587-b0d3e9d8e429', 1611626068631, '无', 0);
INSERT INTO `torder` VALUES ('b42cda1a-5f42-4d87-8cc8-8f596c08a748', 1611626110654, '无', 0);
INSERT INTO `torder` VALUES ('b5b31236-7924-491b-885a-a439e02d188a', 1611626063629, '无', 0);
INSERT INTO `torder` VALUES ('b84fc5af-b4f1-4896-a637-eec99102a1d1', 1611626080638, '无', 0);
INSERT INTO `torder` VALUES ('b8ddabde-514c-4e73-b340-31e390adbe26', 1611626102650, '无', 0);
INSERT INTO `torder` VALUES ('bae9fbec-3bea-4e57-9b70-8bfce10912f8', 1611626067631, '无', 0);
INSERT INTO `torder` VALUES ('bd146ac6-6ad1-471f-8f34-1d2ced692e3e', 1611626054625, '无', 0);
INSERT INTO `torder` VALUES ('bdc71803-db87-4a26-af03-42768ef28a9b', 1611626096647, '无', 0);
INSERT INTO `torder` VALUES ('be759ecb-d3b3-4928-880d-99988ebfdba3', 1611626076636, '无', 0);
INSERT INTO `torder` VALUES ('be771f1c-a599-4793-9f5c-5528eeb19c3f', 1611626050623, '无', 0);
INSERT INTO `torder` VALUES ('beb5d47d-fa7e-4a73-9c4b-ad6b9e37e5d5', 1611626063629, '无', 0);
INSERT INTO `torder` VALUES ('bfb50bce-765c-491d-9ce7-7be26813f812', 1611626114656, '无', 0);
INSERT INTO `torder` VALUES ('c138ccea-687c-4689-af05-ecfb53c48f45', 1611626051623, '无', 0);
INSERT INTO `torder` VALUES ('c1c6dd38-b108-48c4-be4b-4f3d63044a88', 1611626096646, '无', 0);
INSERT INTO `torder` VALUES ('c2c64308-e3e9-4d12-8dc8-287f06cf471a', 1611626066631, '无', 0);
INSERT INTO `torder` VALUES ('c42cd4b7-e86e-4f5e-a7b4-0495f222c475', 1611626075635, '无', 0);
INSERT INTO `torder` VALUES ('c45ef21e-20c1-4157-b3d9-c84c79c31e7b', 1611626088643, '无', 0);
INSERT INTO `torder` VALUES ('c56676f8-3b2c-4381-84ad-042f14c376c7', 1611626102650, '无', 0);
INSERT INTO `torder` VALUES ('c64e601c-0c53-4efd-bc7f-5adc75d36c5b', 1611626084640, '无', 0);
INSERT INTO `torder` VALUES ('c6a89373-36f2-4b4d-8e3e-48795afbfada', 1611626069632, '无', 0);
INSERT INTO `torder` VALUES ('c6f09153-d3cf-4147-9e79-1358a43859e6', 1611626084640, '无', 0);
INSERT INTO `torder` VALUES ('c88ad6b3-70e7-4dd9-a2ed-4ab0db3d2c99', 1611626082640, '无', 0);
INSERT INTO `torder` VALUES ('c8f82107-5c40-484d-b148-abb489e8dd2d', 1611626053624, '无', 0);
INSERT INTO `torder` VALUES ('c947e0a5-c551-4b8d-9cbc-22cd85c18bff', 1611626115657, '无', 0);
INSERT INTO `torder` VALUES ('ca461d65-83f5-4e12-ae82-674fc7053b7a', 1611626071633, '无', 0);
INSERT INTO `torder` VALUES ('cc75040c-1af0-4647-ae38-12dcfa95b477', 1611626075635, '无', 0);
INSERT INTO `torder` VALUES ('cd36234c-6b4e-4dbb-9edf-18b20869ee43', 1611626073635, '无', 0);
INSERT INTO `torder` VALUES ('cddf2435-989c-48e5-abce-ef9a43bd9c80', 1611626062628, '无', 0);
INSERT INTO `torder` VALUES ('cfef8b6f-5d1a-4b25-a145-4c0855cb6218', 1611626068631, '无', 0);
INSERT INTO `torder` VALUES ('d1617f6f-6067-444e-8400-bd794e8ffe9c', 1611626059628, '无', 0);
INSERT INTO `torder` VALUES ('d18b3b60-5cc3-418d-a86d-2060b1204301', 1611626079638, '无', 0);
INSERT INTO `torder` VALUES ('d24cab2d-a3ec-472d-b4ff-c827def060bf', 1611626067631, '无', 0);
INSERT INTO `torder` VALUES ('d2e936b4-e2e6-4f73-b6f7-bc20bedd3e38', 1611626082640, '无', 0);
INSERT INTO `torder` VALUES ('d34009ae-4134-430c-a7d8-a0cea3b43124', 1611626106653, '无', 0);
INSERT INTO `torder` VALUES ('d7f01407-42a3-4bd6-9c6b-b6fba6222bb7', 1611626052623, '无', 0);
INSERT INTO `torder` VALUES ('d8abb597-baf4-46cd-9e68-77afa1d78e67', 1611626113656, '无', 0);
INSERT INTO `torder` VALUES ('d91d00a5-ccfe-4c22-8195-d6bea98b6cdb', 1611626049609, '666', 1);
INSERT INTO `torder` VALUES ('d9a5ba96-3433-48ec-a73b-b90fb2107f29', 1611626095647, '无', 0);
INSERT INTO `torder` VALUES ('db32338c-81f9-43b7-9d11-8c60445b4f6f', 1611626063629, '无', 0);
INSERT INTO `torder` VALUES ('de8bb9f0-e582-4388-b4e5-44becd4bc82d', 1611626108653, '无', 0);
INSERT INTO `torder` VALUES ('e0d82954-248f-404f-b474-dc1efd0b9f98', 1611626095646, '无', 0);
INSERT INTO `torder` VALUES ('e30c2873-a9b3-4127-88b5-840a4f2e68ac', 1611626060628, '无', 0);
INSERT INTO `torder` VALUES ('e30dc1d9-4a83-4936-b7b4-e0d1138cb7f6', 1611626099649, '无', 0);
INSERT INTO `torder` VALUES ('e4262e5a-6bb4-41fd-a002-b4789a078b98', 1611626110654, '无', 0);
INSERT INTO `torder` VALUES ('e4576b70-09a8-4c34-9fab-f4097a624447', 1611626085641, '无', 0);
INSERT INTO `torder` VALUES ('e4755e9f-29dd-4a44-9a8f-33c363aadd35', 1611626103651, '无', 0);
INSERT INTO `torder` VALUES ('e688b7ed-9e26-46b7-a6cc-024a29ef77ab', 1611626097647, '无', 0);
INSERT INTO `torder` VALUES ('e691ea85-33e4-4a25-96c8-ae03016a2c61', 1611626094647, '无', 0);
INSERT INTO `torder` VALUES ('e90d5593-c57e-4112-a34a-c458eb3d8c08', 1611626059628, '无', 0);
INSERT INTO `torder` VALUES ('e93a94ee-f9b8-47c4-936d-0f0564626a27', 1611626056626, '无', 0);
INSERT INTO `torder` VALUES ('e9bc62b6-b021-445c-956b-3b120ee5ef4d', 1611626049609, '无', 0);
INSERT INTO `torder` VALUES ('eb26d8fe-1913-45ad-9b05-665cfe6ec9ec', 1611626078637, '无', 0);
INSERT INTO `torder` VALUES ('ecf2d8f2-bd22-43f6-8e7e-5c9d6950eb16', 1611626049609, '无', 0);
INSERT INTO `torder` VALUES ('edfbc077-0f18-4bc1-badf-e4d5bf2de215', 1611626055626, '无', 0);
INSERT INTO `torder` VALUES ('ee18f1b0-cd7f-4201-9f5e-7209f0215e27', 1611626053624, '无', 0);
INSERT INTO `torder` VALUES ('eea9a89d-3cb1-4b40-ad9d-c12c553ee8e8', 1611626062628, '无', 0);
INSERT INTO `torder` VALUES ('efcce558-c3ac-470f-b841-de9c62b5921f', 1611626069632, '无', 0);
INSERT INTO `torder` VALUES ('f08453bc-39a8-48d7-86ea-0007bb263362', 1611626087643, '无', 0);
INSERT INTO `torder` VALUES ('f0dcf02e-b703-488a-93cf-8e674dc71716', 1611626107653, '无', 0);
INSERT INTO `torder` VALUES ('f0fd975a-5103-4c2e-8028-cd407d1a48ee', 1611626054625, '无', 0);
INSERT INTO `torder` VALUES ('f1d4fbc2-2c4d-4a54-9214-507fd0d0ba38', 1611626061628, '无', 0);
INSERT INTO `torder` VALUES ('f239eec9-bf4f-4c0f-9fbd-56e9756e8433', 1611626100649, '无', 0);
INSERT INTO `torder` VALUES ('f29c6b08-df56-4f9d-9e93-83b74989423e', 1611626104651, '无', 0);
INSERT INTO `torder` VALUES ('f2d8c86f-aeac-41e9-9dfd-58bac9e9c8f3', 1611626078637, '无', 0);
INSERT INTO `torder` VALUES ('f2ea8b5d-09a3-4ca5-9df4-d836dfd48092', 1611626064630, '无', 0);
INSERT INTO `torder` VALUES ('f52d6763-8605-48cc-8b72-7866b261d40b', 1611626050623, '无', 0);
INSERT INTO `torder` VALUES ('f8627bd4-66ca-4062-a923-47710447e6d9', 1611626088644, '无', 0);
INSERT INTO `torder` VALUES ('f9dc6269-8fab-41f8-bb3d-9f82b9983d7f', 1611626093647, '无', 0);
INSERT INTO `torder` VALUES ('fac6da27-0a7f-4c7e-8756-d6d37815d127', 1611626088644, '无', 0);
INSERT INTO `torder` VALUES ('fb54a13c-7a29-4dd6-a034-153dd77ae42e', 1611626073635, '无', 0);
INSERT INTO `torder` VALUES ('fbcfb8da-643d-4704-8eca-9132ee36a799', 1611626085641, '无', 0);
INSERT INTO `torder` VALUES ('fe76ac6d-5317-43e5-86b6-3daa2bb74430', 1611626100649, '无', 0);

2.jdbcForMysql工具类

此处为通用的MySQL工具类只需要更改jdbc连接地址以及账号密码即可

package cn.itcast.ontimer;import java.sql.Connection;
import java.sql.DriverManager;/*** @ClassName:DButil* @Description TODO* @Create By login name:zkrsun* @Date 2021/1/25 17:38*/
public class DButil {public static Connection getConnByJdbc(){Connection conn = null;try {Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");} catch (Exception e) {e.printStackTrace();}return conn;}
}

3.Flink数据输入

此处为自定义数据源继承RichParallelSourceFunction目的是为了实现open方法定义数据库连接,且为支持高并发的数据源

package cn.itcast.ontimer;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;/*** @ClassName:SourceMysql* @Description TODO* @Create By login name:zkrsun* @Date 2021/1/26 10:11*/
public class SourceMysql extends RichParallelSourceFunction<Order> {Connection conn = null;PreparedStatement pst = null;String sql = "select * from torder order by ordertime";@Overridepublic void open(Configuration parameters) throws Exception {conn = DButil.getConnByJdbc();pst = conn.prepareStatement(sql);}@Overridepublic void run(SourceContext<Order> ctx) throws Exception {ResultSet resultSet = pst.executeQuery();while (resultSet.next()){String orderid = resultSet.getString("orderid");long ordertime = resultSet.getLong("ordertime");String orderreview = resultSet.getString("orderreview");int orderstate = resultSet.getInt("orderstate");ctx.collect(new Order(orderid,ordertime,orderstate,orderreview));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void close() throws Exception {if(pst!=null){pst.close();System.out.println("source端的预连接对象已关闭");}if(conn!=null){conn.close();System.out.println("source端的jdbc连接对象已关闭");}}@Overridepublic void cancel() {}
}

代码的逻辑为使用jdbc加载数据库中的数据按照事件字段升序排序,并将其封装入自定义的订单对象,一秒钟发送一条数据。

下面为订单对象代码

package cn.itcast.ontimer;/*** @ClassName:Order* @Description TODO* @Create By login name:zkrsun* @Date 2021/1/25 16:35*/
public class Order {private String orderId;private Long orderTime;private int orderState = 0; //0为未评价private String orderReview  = "未评价";public Order(String orderId, Long orderTime) {this.orderId = orderId;this.orderTime = orderTime;}public Order(String orderId, Long orderTime, int orderState, String orderReview) {this.orderId = orderId;this.orderTime = orderTime;this.orderState = orderState;this.orderReview = orderReview;}public Order() {}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public Long getOrderTime() {return orderTime;}public void setOrderTime(Long orderTime) {this.orderTime = orderTime;}public int getOrderState() {return orderState;}public void setOrderState(int orderState) {this.orderState = orderState;}public String getOrderReview() {return orderReview;}public void setOrderReview(String orderReview) {this.orderReview = orderReview;}
}

4.Flink定时器

将获取到的数据,使用事件时间注册定时器,下单时间即为数据的事件时间,五秒后如果订单评价状态仍为0(即用户没有进行手动商品评价)自动将订单状态修改为1(商品已评价)并且将订单评价修改为“五星好评”

package cn.itcast.ontimer;import org.apache.flink.apimon.state.ValueState;
import org.apache.flink.apimon.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;/*** @ClassName:OntimerFunction* @Description TODO* @Create By login name:zkrsun* @Date 2021/1/26 9:36*/
public class OntimerFunction extends KeyedProcessFunction<Tuple, Order, Order> {ValueState<String> vs = null;String querySql = "select orderstate from torder where orderid = ?";String updateSql = "update torder set orderstate = ?,orderreview='五星好评' where orderid = ?";Connection conn = null;PreparedStatement queryPst = null;PreparedStatement updatePst = null;@Overridepublic void open(Configuration parameters) throws Exception {vs = getRuntimeContext().getState(new ValueStateDescriptor<String>("vs", String.class));conn = DButil.getConnByJdbc();queryPst = conn.prepareStatement(querySql);updatePst = conn.prepareStatement(updateSql);}@Overridepublic void processElement(Order value, Context ctx, Collector<Order> out) throws Exception {vs.update(value.getOrderId());ctx.timerService().registerProcessingTimeTimer(value.getOrderTime() + 5000);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Order> out) throws Exception {String orderid = vs.value();queryPst.setString(1,orderid);ResultSet resultSet = queryPst.executeQuery();int orderstate = 0;while (resultSet.next()) {orderstate = resultSet.getInt("orderstate");}if (orderstate != 1) {updatePst.setInt(1, 1);updatePst.setString(2, orderid);updatePst.executeUpdate();}}@Overridepublic void close() throws Exception {if (updatePst != null) {updatePst.close();System.out.println("定时器的更新预连接对象已关闭");}if (queryPst != null) {queryPst.close();System.out.println("定时器的查询预连接对象已关闭");}if (conn != null) {conn.close();System.out.println("jdbc连接对象已关闭");}}
}

5.Flink主程序入口

package cn.itcast.ontimer;import org.apache.flink.apimon.restartstrategy.RestartStrategies;
import org.apache.flink.apimon.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @ClassName:OrderAutoReview* @Description TODO* @Create By login name:zkrsun* @Date 2021/1/25 16:33*/
public class OrderAutoReview {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);  //设置检查点时间为5senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  //设置流时间为事件事件env.setParallelism(3);  //设置并行度为3env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(5)));    //设置程序失败重启策略为延迟重启尝试三次每次间隔五秒env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置flink的一次性语义DataStreamSource<Order> streamSource = env.addSource(new SourceMysql());streamSource.keyBy("orderId").process(new OntimerFunction());env.execute();}
}

更多推荐

flink定时器实现订单自动好评

本文发布于:2024-03-10 03:37:25,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1726909.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:定时器   好评   订单   flink

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!