大數據

Flink 旁路輸出(分流)

官方文檔-關於分流

最好大家還是看官方文檔,我只是當一個搬運工

一、背景

在一些業務場景中,一個流中可能有多種類型的數據,比如訂單:有線上訂單,有線下訂單。當需要將不同類型的數據進行分別處理,比如 寫入到不同的數據表或者join 不同的其他流時,這個時候使用分流就比較合適。

二、官方常用的幾種方法

三、示範

本文只詳細介紹最常用 process 分流 ,道理都是相通的

直接上代碼偽碼,大家主要要理解,而不是直接複製代碼

//這是訂單source,最原始的流
val orderSource =  這是你構建source 的方法

  //創建線上訂單 tag
    val onlineOrderTag = new OutputTag[JSONObject]("onlineOrder")
   //創建線下訂單 tag
    val offlineOrderTag = new OutputTag[JSONObject]("offlineOrder")

   // 這個sideOutStream 就是分流之後的流對象
   val sideOutStream = orderSource
      .filter(new PaymentFilter)  // 這裡是一個過濾邏輯,如果你沒有可以不過濾
       // 這個process 就是分流的操作了
      .process(new ProcessFunction[String, JSONObject] {
        override def processElement(orderString: String, ctx: ProcessFunction[String, JSONObject]#Context, out: Collector[JSONObject]): Unit = {
          val outOrder = JSON.parseObject(orderString)
        
          //通過收銀員信息判斷是否屬於線下訂單
          if (!outOrder.containsKey("cashier_id") || StringUtils.isBlank(outOrder.getString("cashier_id"))) {
            ctx.output(onlineOrderTag, outOrder)
          } else {
            ctx.output(offlineOrderTag, outOrder)
          }
        }
      }
      )


val onlineStream = sideOutStream.getSideOutput(onlineOrderTag)

val offlineStream = sideOutStream.getSideOutput(offlineOrderTag)

// 流已經分好了,後面是sink 還是 去幹其他的,就看你的業務邏輯了
onlineStream.addSink()

offlineStream.addSink()

生產實踐

下圖是真實生產的一個DAG圖

內部使用了分流, join ,自定義剔除器 等滿足業務需求

後面會更新 join 和 自定義剔除器 trigger 等 實戰場景,感興趣的朋友可以加個關注喲

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *