求助:Flink 1.9 sql 两个表 Join 后如何做 CEP ?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// 构造订单数据
DataStream<Order> ordersData = env.fromCollection(Arrays.asList(
new Order("001", "iphone", new Timestamp(1545800002000L)),
new Order("002", "mac", new Timestamp(1545800003000L)),
new Order("003", "book", new Timestamp(1545800004000L)),
new Order("004", "cup", new Timestamp(1545800018000L))
))
.assignTimestampsAndWatermarks(new OrderTimestampExtractor());
// 构造付款表
DataStream<Payment> paymentData = env.fromCollection(Arrays.asList(
new Payment("001", "alipay", new Timestamp(1545803501000L)),
new Payment("002", "card", new Timestamp(1545803602000L)),
new Payment("003", "card", new Timestamp(1545803610000L)),
new Payment("004", "alipay", new Timestamp(1545803611000L))
))
.assignTimestampsAndWatermarks(new PaymentTimestampExtractor());
tEnv.registerDataStream("t_order", ordersData, "orderId, productName, orderTime");
tEnv.registerDataStream("t_payment", paymentData, "orderId, payType, payTime");
// 两表 JOIN
String sqlQuery = "SELECT o.orderId as orderId, o.productName as productName, \n" +
"p.payType as payType, o.orderTime as orderTime, \n" +
"cast(payTime as timestamp) as payTime\n" +
"FROM t_order AS o \n" +
"JOIN t_payment AS p \n" +
"ON o.orderId = p.orderId AND\n" +
"\tp.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR";
Table queryResult = tEnv.sqlQuery(sqlQuery);
tEnv.registerTable("TemporalJoinResult", queryResult);
String cepSQL = "select *, MATCH_ROWTIME() as rowtime from TemporalJoinResult\n" +
"\tMATCH_RECOGNIZE (\n" +
"\tORDER BY rowtime\n" +
" MEASURES\n" +
" A.orderId AS orderId,\n" +
" A.productName AS productName,\n" +
" A.orderTime AS orderTime,\n" +
"\t\tB.payTime AS payTime\n" +
"\tONE ROW PER MATCH \n" +
"\tAFTER MATCH SKIP PAST LAST ROW\n" +
" PATTERN (A B)\n" +
" DEFINE\n" +
" A AS payType = 'alipay',\n" +
" B AS productName = 'iphone'\n" +
"\t) as T";
Table cepResult = tEnv.sqlQuery(cepSQL);
tEnv.toAppendStream(cepResult, Row.class).print();
env.execute();
MATCH_RECOGNIZE 里边 ORDER BY rowtime 不清楚怎样指定?求大佬帮忙