Apache Flink——侧输出流(side output)

前言

flink处理数据流时,经常会遇到这样的情况:处理一个数据源时,往往需要将该源中的不同类型的数据做分割(分流)处理,假如使用 filter算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;

flink中的侧输出,就是将数据流进行分割,而不对流进行复制的一种分流机制。flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据;

简单理解就是,根据业务上的一定规则,将一个源中的数据拆分成不同的流,即主流和侧输出流。

侧输出流(side output)

大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。

除了从DataStream操作输出主结果流外,也可以生成任一数量的额外的侧输出流。结果流可以和主输出流的类型可以不匹配,并且侧输出流可以有不同类型。侧输出流的操作当你分流时非常有用,之前你需要先复制一个流再过滤出来,有了侧输出流,就不需要这样操作。

具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文
的.output()方法就可以了。

DataStream stream = env.addSource(...);

SingleOutputStreamOperator longStream stream.process(new ProcessFunction() {

    @Override
    public void processElement( Integer value, Context ctx, Collector out) throws Exception {
        // 转换成 Long,输出到主流中
        out.collect(Long.valueOf(value));
        // 转换成 String,输出到侧输出流中
        ctx.output(outputTag, "side-output: " + String.valueOf(value));
    }

});

当使用侧输出流时,首先需要定义一个OutputTag,它将要被用来确定一个侧输出流。

OutputTag outputTag = new OutputTag("side-output") {};

注意:侧输出流的类型是根据侧输出流包括元素的类型来确定。

如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput() 方法,传入对应的OutputTag,这个方式与窗口API 中获取侧输出流是完全一样的。

DataStream stringStream = longStream.getSideOutput(outputTag);

可以从以下方法中来把数据输出到侧输出流

在以上的函数中可以用参数Context来暴露给用户发送数据到侧输出流。下面例子是用ProcessFunction来发送数据到侧输出流。

import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import scala.Tuple3;

import java.time.Duration;

/**
 * @Author: huangyibo
 * @Date: 2022/7/19 0:34
 * @Description:
 */

public class SideOutputStreamTest {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //设置生成水位线的时间间隔
        env.getConfig().setAutoWatermarkInterval(100);

        //乱序流的Watermark生成
        SingleOutputStreamOperator stream = env.addSource(new ClickSource())
                // 插入水位线的逻辑 设置 watermark 延迟时间,2 秒
                .assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 2s
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                // 抽取时间戳的逻辑
                                .withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getTimestamp())
                );

        OutputTag> maryTag = new OutputTag>("Mary"){};
        OutputTag> boboTag = new OutputTag>("Bobo"){};

        SingleOutputStreamOperator processStream = stream.process(new ProcessFunction() {
            @Override
            public void processElement(Event event, Context context, Collector out) throws Exception {
                if ("Mary".equals(event.getUser())) {
                    context.output(maryTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
                } else if ("Bobo".equals(event.getUser())) {
                    context.output(boboTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
                } else {
                    out.collect(event);
                }
            }
        });

        processStream.print("else");
        processStream.getSideOutput(maryTag).print("Mary");
        processStream.getSideOutput(boboTag).print("Bobo");

        env.execute();
    }
}
import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * @Author: huangyibo
 * @Date: 2022/7/7 0:00
 * @Description: 测试迟到数据
 */

public class LateDataTest {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //设置生成水位线的时间间隔
        env.getConfig().setAutoWatermarkInterval(100);

        //乱序流的Watermark生成
        SingleOutputStreamOperator streamOperator = env.addSource(new ClickSource())
                // 插入水位线的逻辑 设置 watermark 延迟时间,2 秒
                .assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 2s
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                // 抽取时间戳的逻辑
                                .withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getTimestamp())
                );

        //定义一个输出标签
        OutputTag lateTag = new OutputTag("late"){};

        //统计每个url的访问量
        SingleOutputStreamOperator result = streamOperator.keyBy(Event::getUrl)
                //滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //允许窗口处理迟到数据 允许1分钟的延迟
                .allowedLateness(Time.minutes(1))
                //将最后的迟到数据输出到侧输出流
                .sideOutputLateData(lateTag)
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        result.print("result");
        result.getSideOutput(lateTag).print("late");

        env.execute();
    }


    /**
     * 自定义实现AggregateFunction, 增量计算url页面的访问量,来一条数据就 +1
     */
    public static class UrlViewCountAgg implements AggregateFunction {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }


    /**
     * 自定义实现ProcessWindowFunction, 包装窗口信息输出
     */
    public static class UrlViewCountResult extends ProcessWindowFunction {

        @Override
        public void process(String url, Context context, Iterable iterable, Collector out) throws Exception {
            Long urlCount = iterable.iterator().next();
            //集合窗口信息输出
            long start = context.window().getStart();
            long end = context.window().getEnd();
            UrlViewCount urlCountView = new UrlViewCount();
            urlCountView.setUrl(url);
            urlCountView.setCount(urlCount);
            urlCountView.setWindowStart(start);
            urlCountView.setWindowEnd(end);
            out.collect(urlCountView);
        }
    }
}

Flink侧输出流有两个作用

  • 1、分隔过滤:充当filter算子功能,将源中的不同类型的数据做分割处理。因为使用filter 算子对数据源进行筛选分割的话,会造成数据流的多次复制,导致不必要的性能浪费,过滤后不需要的数据可以重新写入Pulsar或Kafka的topic中供其他应用消费处理。

  • 2、延时数据处理:在做对延时迟窗口计算时,对延时迟到的数据进行处理,即时数据迟到也不会造成丢失。

参考:
https://blog.csdn.net/rustwei/article/details/121102439

版权声明:
作者:zhangchen
链接:https://www.techfm.club/p/45541.html
来源:TechFM
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>