从零开始的Java8-Lambda

引子

首先Lambda配合Stream拥有很强大的数据处理能力,并且能够以更加清晰的表达方式描述数据,大大减少了代码的冗余。在平常开发中,能大大提高开发效率,学习它的目的也正因为如此,此文介绍了一些Lambda相关的知识以及一些注意事项,避免滥用反而起到反作用。

Lambda基本介绍

Lambda:可以理解为一种匿名函数:它没有名称,但它有参数列表、函数主体、返回类型,可能还有一个可以抛出的异常列表。

Lambda示例

// 在以前,我们使用匿名类是这样:
Thread t = new Thread(new Runnable() { 
 public void run(){ 
    System.out.println("Hello world"); 
 } 
}); 
// 现在用Lambda表达式的话,看起来是这样:
Thread t = new Thread(() -> System.out.println("Hello world"));

从上面的例子中可以看出,采用匿名内部类和采用Lambda的写法,Lambda的写法明显更加精简和清晰了

Lambda最基本的构成

() -> System.out.println("Hello world");

  • 参数列表: 空参则括号里面什么都不写
  • ->: 把参数列表与Lambda主体分隔开
  • Lambda主体: 代码具体逻辑

带入参,并且有返回值

// 第一种写法,如果Lambda主体部分不带花括号,可以不用写return,返回的具体类型编译器会自动推断
(String s1, String s2) -> s1.concat(s2);

// 第二种写法,如果Lambda主体部分加了花括号,要带返回值必须加上return,否者就是Void类型的匿名函数
(String s1, String s2) -> {
    return s1.concat(s2);
};

默认方法

如果要在接口中添加新方法,则必须在实现该接口的类中提供其实现代码。为了解决这个问题,Java 8引入了默认方法的概念,它允许接口具有默认方法,而不会影响其实现类。默认方法不是抽象方法,子类实现了该接口会继承该默认实现,子类也可以覆盖该默认实现。
对于学习函数式接口关系不大,可以当做是一个新特性,如果不打算了解可以直接跳过。

子类可以继承接口的默认方法
// 定义接口1
interface MyInterface1 {
    default void defaultMethod() {
        System.out.println("defaultMethod1");
    }
}
// 定义接口2,接口2继承了接口1,也默认继承了接口1的默认方法
interface MyInterface2 extends MyInterface1 {
}

static class A implements MyInterface2 {
}

static class B implements MyInterface1 {
}
public static void main(String[] args) {
    A a = new A();
    a.defaultMethod();
    B b = new B();
    b.defaultMethod();
}
// 输出
// defaultMethod1
// defaultMethod1

子类可以覆盖接口的默认方法

// 定义接口1
interface MyInterface1 {
    default void defaultMethod() {
        System.out.println("defaultMethod1");
    }
}

static class A implements MyInterface1 {
    public void defaultMethod() {
        System.out.println("defaultMethod1 from MyInterface1");
    }
}

public static void main(String[] args) {
    A a = new A();
    a.defaultMethod();
}
// 输出
// defaultMethod1 from MyInterface1

子类实现了两个拥有相同默认方法,可以通过:接口名称.super.方法名()调用

// 定义接口1
interface MyInterface1 {
    default void defaultMethod() {
        System.out.println("defaultMethod1");
    }
}
interface MyInterface2 {
    default void defaultMethod() {
        System.out.println("defaultMethod2");
    }
}
static class A implements MyInterface1,MyInterface2 {

    // 此时必须实现该方法,否则通过不了编译
    @Override
    public void defaultMethod() {
        // 如果要调用MyInterface2的默认方法,可以使用MyInterface2.super.defaultMethod();
        MyInterface2.super.defaultMethod();
    }
}

public static void main(String[] args) {
    A a = new A();
    a.defaultMethod();
}
// 输出
// defaultMethod2

子类对接口默认方法调用规则

  1. 类中的方法优先级最高。类或父类中声明的方法的优先级高于任何声明为默认方法的优先级。
  2. 如果无法依据第一条进行判断,那么子接口的优先级更高:函数签名相同时,优先选择拥有最具体实现的默认方法的接口。
  3. 最后,如果还是无法判断,继承了多个接口的类必须通过显式覆盖和调用期望的方法,显式地选择使用哪一个默认方法的实现。

上面第1条规则以及第3条规则都已经展示过,下面展示第二条规则,该类图继承关系如下:

按照规则2,MyInterface2比MyInterface1更加具体,所以A会调用MyInterface2的defalutMethod

interface MyInterface1 {
    default void defaultMethod() {
        System.out.println("defaultMethod1");
    }
}

interface MyInterface2 extends MyInterface1{
    default void defaultMethod() {
        System.out.println("defaultMethod2");
    }
}

static class A implements MyInterface1,MyInterface2 {
}

public static void main(String[] args) {
    A a = new A();
    a.defaultMethod();
}
// 输出
// defaultMethod2

接口静态方法

与接口的默认方法类似,需要加上关键字static,静态方法需要,并且由于定义是完整的并且方法是静态的,因此在实现类中不能覆盖或更改这些方法。

interface MyInterface1 {
    default void defaultMethod() {
        System.out.println("defaultMethod1");
    }
    static void staticMethod() {
        System.out.println("staticMethod1");
    }
}
public static void main(String[] args) {
    MyInterface1.staticMethod();
}
// 输出
// staticMethod1

过于简单,就不上更多的例子了,下面直接说明与默认方法的区别就差不多了解了。

与默认方法的相同点:

  • 静态方法与默认方法必须要有默认的实现;
  • 静态方法与默认方法都不是抽象方法;

与默认方法的不同点:

  • 子类实现了该接口,子类不会继承静态接口方法,并且也不能覆盖静态接口方法,但是子类可以定义与父接口一样的静态方法,如果要调用接口的静态方法只能以接口名称.方法名()来调用;接口的默认方法子类是可以继承默认方法并且也可以覆盖的

函数式接口

说起Lambda,就必须了解函数式接口,因为要使用Lambda,必须在函数式接口上使用。
函数式接口:就是一个有且仅有一个抽象方法,但是可以有多个默认方法的接口,这样的接口可以隐式转换为Lambda表达式。一般在函数式接口上都有个注解@FunctionalInterface,该注解的作用类似@Override一样告诉编译器这是一个函数式接口,用于编译期间检测该接口是否仅有一个抽象方法,如果拥有多个则编译不通过。如下图所示

在函数式接口上使用lambda表达式

函数式接口可以被隐式转换为 lambda 表达式。
如下例子

Thread t = new Thread(() -> System.out.println("Hello world"));

我们可以看看Thread的构造:

public Thread(Runnable target) {
        init(null, target, "Thread-" + nextThreadNum(), 0);
}

其中入参为Runnable类型的接口,继续查看Runnable接口

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

可以看出在jdk1.8中,Runnable就是一个函数式接口

Runnable r1 = () -> System.out.println("Hello world")
// 等价于
Runnable r2 = new Runnable() {
    public void run(){ 
        System.out.println("Hello world"); 
    } 
}

run()方法签名:参数列表为空,返回为void;lambda签名:() -> void 参数列表为空,返回为void可以看出Runnable的run方法签名与lambda的签名匹配,我们将这种对方法抽象描述叫作函数描述符

在java8中,提供了很多函数式接口,可以用于描述各种Lambda表达式的签名

函数式接口 函数描述符
Predicate T->boolean
Consumer T->void
Function T->R
Supplier ()->T
UnaryOperator T->T
BiPredicate (L,R)->boolean
BiConsumer (T,U)->void
BiFunction (T,U)->R

这些都是较为常用的函数式接口,还有很多都在java.util.function包下,有兴趣可以自行查看。

Stream

一个新的抽象,称为流,可以以声明的方式处理数据。提供了一系列的api,使用类似sql语句直观的方式来提供对集合处理的高阶抽象。

另外还有两个特点:

  • 流水线:很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。这样做可以对操作进行优化, 比如延迟执行和短路。流水线的操作可以看作对数据源进行数据库式查询。
  • 内部迭代:以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式。

示例

// 创建一个1至10的集合
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers
    .stream() //将集合转化成串行流
    .filter(i -> i % 2 == 0) //过滤掉奇数
    .limit(3) //取出前三个元素
    .map(Double::valueOf) // 将int类型转换成double类型
    .forEach(System.out::println); // 迭代并打印出元素
// 最终输出结果:2.0 4.0 6.0

如果采用传统的For-Each迭代方式来处理集合,可以想象代码可不是这短短这几行了。

整个流水线的操作包含两个

  • 中间操作:形成一条操作流水线。例如:filter()对流操作并返回一个流,limit()对流操作也会返回流,这样可以通过多个中间操作连接起来合成一个查询。注意:整个流水线中,除非触发了一个终端操作,否者中间操作不会执行任何处理。因为多个中间操作可以合并起来,在终端操作时一次性全部处理。
  • 终端操作:执行流水线,生成处理结果。

方法引用

上面例子中有一行代码为:map(Double::valueOf),::这个写法是什么意思呢?
实际上Double::valueOf就是一个方法引用。map(Double::valueOf)等价于map(element -> Double.value(element))

类名放在分隔符::前,方法的名称放在后面
例如,Double::valueOf就是引用了Double类中定义的方法valueOf,并且不需要加括号;
方法引用就是Lambda表达式的快捷写法,例如:

  • (Integer i) -> Double.valueOf(i) --- Double::valueOf
  • (String s) -> System.out.println(s) --- System.out::println
  • (str, i) -> str.substring(i) --- String::substring

方法引用的种类

  • 静态方法引用:ClassName::methodName
  • 实例上的实例方法引用:instanceReference::methodName
  • 超类上的实例方法引用:super::methodName
  • 类型上的实例方法引用:ClassName::methodName
  • 构造方法引用:Class::new
  • 数组构造方法引用:TypeName[]::new

生成流

  • Collection的stream()方法或者parallelStream() ,例如Arrays.asList(1,2,3).stream()。
  • Arrays.stream(Object[]) 例如Arrays.stream(new int[]{1,2,3})。
  • 使用流的静态方法,比如Stream.of(Object[]), IntStream.range(int, int) 或者 Stream.iterate(Object, UnaryOperator) ,如Stream.iterate(0, n -> n * 2) , 或者generate(Supplier s) 如Stream.generate(Math::random)。
  • BufferedReader.lines() 从文件中获得行的流。
  • Files类的操作路径的方法,如list、find、walk等。
  • 随机数流Random.ints()。
  • 其它一些类提供了创建流的方法,如BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), 和 JarFile.stream()。
  • 更底层的使用StreamSupport,它提供了将Spliterator转换成流的方法。
// 列举一些常用创建流的例子
List list = Arrays.asList(1, 2, 3);
// Collection的stream方法
Stream stream = list.stream();
// Stream的of方法
Stream> stream2 = Stream.of(list);
// BufferedReader的lines方法
BufferedReader bufferedReader = new BufferedReader(new FileReader("filePath"));
Stream lines = bufferedReader.lines();

中间操作

filter

Stream filter(Predicate predicate)
返回此流中匹配元素组成的流

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers
        .stream()
        .filter(i -> i % 2 == 0) //过滤掉奇数
        .forEach(System.out::println); // 终端操作,打印结果
// 输出:2 4 6 8 10

map

Stream map(Function mapper)
返回一个流,该流的元素映射成另外的值,新的值类型可以与原来的类型不同

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers
        .stream()
        .map(i -> i + "str ") // 转换成String
        .forEach(System.out::print); // 终端操作,打印结果
// 输出:1str 2str 3str 4str 5str 6str 7str 8str 9str 10str

mapToInt

IntStream mapToInt(ToIntFunction mapper)
返回一个IntStream,该流的元素映射成int类型的流 IntStream:原始流

List strings = Arrays.asList("1","2","3");
        strings
                .stream()
                .mapToInt(Integer::parseInt) // 转换成int
                .forEach(System.out::println); // 终端操作,打印结果
// 输出:int类型的 1 2 3

mapToLong,mapToDouble与mapToInt类似只不过原始类型不同而已,下面会单独讲解这三个原始流的作用及区别。

flatMap

Stream flatMap(Function> mapper)
返回一个Stream,和map类似,不同的是会将每个元素的扁平化。
flatMap的理解可能稍微有点难,通过下面两个例子来展示。
例子1:引用java8实战中的例子 将集合中的两个字符串根据字母去重复

List strings = Arrays.asList("Hello", "World");
List list = strings
                .stream() // 将集合转成流
                .map(s -> s.split("")) // 转换成['H','e','l','l','o'],['W','o','r','l','d'] 两个数组
                .flatMap(Arrays::stream) // 将两个数组扁平化成为['H','e','l','l','o','W','o','r','l','d'],实际上还是把两个数组再次转成流
                .distinct() // 去除重复元素
                .collect(Collectors.toList()); // 终端操作,转化成集合
System.out.println(list);
// 输出: [H, e, l, o, W, r, d]

引入java8实战的流程图如下:

例子2:

List numbers1 = Arrays.asList(1, 2, 3);
List numbers2 = Arrays.asList(4, 5, 6);
Stream.of(numbers1, numbers2) // 将两个集合转成流
        .flatMap(numbers -> numbers.stream()) // 两个集合流扁平化为[1,2,3,4,5,6]
        .forEach(System.out::println);
// 输出: 1,2,3,4,5,6

下图帮助理解扁平化流

distinct

Stream distinct()
过滤流中重复的元素

Arrays.asList(1, 2, 3, 2, 3, 4)
                .stream()
                .distinct() // 去除重复
                .forEach(System.out::println);
// 输出: 1234

sorted

Stream sorted()
对流中的元素顺序排序

Arrays.asList(1, 3, 5, 2, 4)
                .stream()
                .sorted() // 顺序排序
                .forEach(System.out::println);
// 输出: 12345

上面的例子中只支持顺序排序,如果要倒序呢?Stream中sorted还提供了一个重载方法:
Stream sorted(Comparator comparator);
可以通过传入Comparator来实现自己的排序规则

Arrays.asList(1, 3, 5, 2, 4)
                .stream()
                .sorted(Comparator.reverseOrder()) // 倒序排序
                .forEach(System.out::println);
// 输出: 54321

limit

Stream limit(long maxSize)
截取流,返回一个不超过给定长度的流

Arrays.asList(1, 2, 3, 4, 5)
                .stream()
                .limit(3) // 截取前三个元素
                .forEach(System.out::println);
// 输出: 123

skip

Stream skip(long n)
跳过给定长度的流

Arrays.asList(1, 2, 3, 4, 5)
                .stream()
                .skip(2) // 跳过前两个元素
                .forEach(System.out::println);
// 输出: 345

parallel

S parallel()
将流转成并行流

Arrays.asList(1, 2, 3, 4, 5)
                .stream()
                .parallel() // 转成并行流
                .forEach(System.out::println);
// 由于是并行的,每次输出结果都会不一致
// 输出: 1 5 2 4 3
parallel线程安全需要注意的点

直接上例子

// 反例1
for (int i = 0; i < 5; i++) {
    List list = new ArrayList<>();
    IntStream.rangeClosed(1, 1000).parallel().forEach(element->{
        list.add(element);
    });
    System.out.println(list.size());
}

// 输出:981 990 962 ...... 多次运行会发现每次结果都不一样,并且有时还会报ArrayIndexOutOfBoundsException数组越界
// 这边体现了在多线程中操作共享变量引发的问题,例如list容器当前容量为50,两个线程同时进入方法体,此时线程A持有的list里面有49个元素,线程B持有的list里面也是49个元素,然后线程A执行list.add()完成,此时容器内的元素的数量有50,由于线程之间不可见,线程B也进入到了add方法并且过了list容器扩容的检查,然后添加元素时发生ArrayIndexOutOfBoundsException

//  如果要能安全的新增,那么可以使用线程安全的容器
List list = Collections.synchronizedList(new ArrayList<>());
List list = new CopyOnWriteArrayList<>();

// 反例2
List list = new ArrayList<>(1000);
long count = IntStream.rangeClosed(1, 1000).parallel().map(element -> {
    list.add(element);    
    return element;
}).count();

long count = IntStream.rangeClosed(1, 1000).parallel().peek(element -> {
    list.add(element);
}).count();

// 使用并行流时,不要去操作共享变量,以上例子皆为反例
parallel性能上需要注意的点

对并行流的效率进行测试,每台机器上的结果可能不一致,请自行注意。下面例子全部采用遍历五次,取其中最快的一次。

// 串行与并行流效率测试 基于i7 8核cpu
// 对100_000_000求和
// for求和性能测试
static void testFor(long size) {
    List timeList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        long sum = 0;
        long start = System.currentTimeMillis();
        for (long j = 0L; j <= size; j++) {
            sum += j;
        }
        long end = System.currentTimeMillis();
        timeList.add((end - start));
    }
    System.out.println("For 处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}

// 并行流求和性能测试
static void testParallel(long size) {
    List timeList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        long start = System.currentTimeMillis();
        Stream.iterate(0L, (element -> element + 1L)).limit(size).parallel().reduce(0L,Long::sum);
        long end = System.currentTimeMillis();
        timeList.add((end - start));
    }
    System.out.println("ParallelStream 处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}

public static void main(String[] args) {
    // 初始值
    long size = 10_000_000L;
    testFor(size);
    testParallelStream(size);
}
// 输出为:
// For 处理时间:5ms
// ParallelStream 处理时间:254ms

为什么并行的会比传统For要慢,是因为Stream.iterate生成的是装箱对象,在求和过程中,装箱对象需要拆箱,计算完还会在装箱,数据量越大,那么采用装箱对象计算则会越慢。可以稍微更改一行代码
Stream.iterate(0L, (element -> element + 1)).limit(size).parallel().reduce(0L,Long::sum);更改为
Stream.iterate(0L, (element -> element + 1)).mapToLong(Long::longValue).limit(size).parallel().reduce(0L,Long::sum);,这边生成流的时候先转成原始流,然后在去做计算

// 并行流求和性能测试
static void testParallelStream(long size) {
    List timeList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        long start = System.currentTimeMillis();
        Stream.iterate(0L, (element -> element + 1L)).mapToLong(Long::longValue).limit(size).parallel().reduce(0L,Long::sum);
        long end = System.currentTimeMillis();
        timeList.add((end - start));
    }
    System.out.println("ParallelStream 处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
    // 初始值
    long size = 10_000_000L;
    testFor(size);
    testParallelStream(size);
}
// 输出为:
// For 处理时间:5ms
// ParallelStream 处理时间:143ms

// 可以看出提升了接近一倍的性能,在数据量更大的情况下,会更高。
// 在java8里,还提供了3个生成原始流的对象:LongStream,DoubleStream,IntStream,下面直接测试采用原始流来做测试
static void testParallelLongStream(long size) {
    List timeList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        long start = System.currentTimeMillis();
        LongStream.rangeClosed(0, size).parallel().sum();
        long end = System.currentTimeMillis();
        timeList.add((end - start));
    }
    System.out.println("ParallelLongStream 处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
    // 初始值
    long size = 10_000_000L;
    testFor(size);
    testParallelStream(size);
    testParallelLongStream(size)
}
// 输出为:
// For 处理时间:5ms
// ParallelStream 处理时间:148ms
// ParallelLongStream 处理时间:1ms

虽然将序列流转成并行流很容易,但是不恰当的使用反倒会成为负优化。在数据量不大的情况下,并行不一定比顺序的要快,反倒要慢上很多,因为数据量小的情况下,在线程的上下文切换之间的开销已经大于数据处理的开销了。以及在做数值计算的情况下,要留意是否是装箱对象,自动装箱拆箱在数据量大起来会成为性能上的累赘。

下面再看一个例子

static void testStructure(Collection c) {
    List timeList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        long start = System.currentTimeMillis();
        c.parallelStream().reduce(0L, Long::sum);
        long end = System.currentTimeMillis();
        timeList.add((end - start));
    }
    // 取五次中最快的一次 
    System.out.println("处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}

public static void main(String[] args) {
    // 使用ArrayList容器
    ArrayList arrayList = Stream.iterate(1L, a -> a + 1L).limit(10_000_000L).collect(toCollection(ArrayList::new));
    // 使用LinkedList容器
    LinkedList linkedList = Stream.iterate(1L, a -> a + 1L).limit(10_000_000L).collect(toCollection(LinkedList::new));
    testStructure(linkedList);
    testStructure(arrayList);
}
// 输出
// 处理时间:420ms
// 处理时间:36ms

在选用数据结构上,可以看出ArrayList在并行中效率要高于LinkedList,这是因为ArrayList的拆分效率比LinkedList高得多,前者用不着遍历就可以平均拆分,而后者则必须遍历。

按照可分解性总结了一些流数据源适不适于并行

数据源 可分解性
ArrayList 极佳
IntStream.range 极佳
HashSet
TreeSet
LinkedList
Stream.iterate
parallel操作上需要注意的点

并行流底层使用的是java7引入的Fork/Join(并发框架),它可以以并行的方式将任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体的结果。此文不多描述,有兴趣者自行查阅。需要注意的是,使用并行流时,内部使用了默认的 ForkJoinPool,池的大小为默认的cup核数-1(java8实战说的是默认核数,如果看过此书的请自行测试),Runtime.getRuntime().availableProcessors()来查看cpu的核心数量。

parallel运行时监控的线程数

在使用并行流时请注意,如果为IO密集型的并行,如果在多处使用,极有可能会影响所有的并行流,因为使用的是系统全局的ForkJoinPool,当池子里的线程被占用了,那么别处要使用线程只能等待它被释放。

// 模拟8个任务,独占线程并且不释放
Runnable runnable = () -> IntStream.rangeClosed(1, 8).parallel().forEach(c -> {
    try {
        System.out.println(Thread.currentThread().getName());
        Thread.sleep(10000); 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
// 启用任务
new Thread(runnable).start();
System.out.println("任务开始");
// 等待一会,让池子里的线程充分被占用
Thread.sleep(1000);
IntStream.rangeClosed(0, 1000).parallel().forEach(c -> {
    try {
        // 打印当前前程,查看是否使用了ForkJoinPool中的线程
        System.out.println(Thread.currentThread().getName());
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

输出结果如下

可以看出,当池子里的线程被占用完,别的地方使用了并行流,完全变成了单线程执行。如果要避免这种情况,可以设置JVM启动参数
-Djava.util.concurrent.ForkJoinPool.common.parallelism=16来设置ForkJoinPool的大小,也可以使用代码System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16")来设置全局的参数,以上两种方法及其不推荐,因为它将影响所有的并行流,推荐使用自定义ForkJoinPool的方式,如下所示

Runnable runnable = () -> IntStream.rangeClosed(1, 8).parallel().forEach(c -> {
    try {
        System.out.println(Thread.currentThread().getName());
        Thread.sleep(10000); 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
// 启用任务
new Thread(runnable).start();
System.out.println("任务开始");
// 设置一个容量为10的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
// 执行任务
ForkJoinTask submit = forkJoinPool.submit(() -> {
    IntStream.rangeClosed(0, 20).parallel().forEach(c -> {
        try {
            System.out.println(Thread.currentThread().getName());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
});
while (!submit.isDone()) {
    Thread.sleep(500);
}

输出结果如下

sequential

S sequential()
将流转成序列流

Arrays.asList(1, 2, 3, 4, 5)
                .parallelStream()
                .sequential() // 转成序列流
                .forEach(System.out::println);
// 输出: 1 2 3 4 5

终端操作

allMatch,anyMatch,noneMatch

boolean anyMatch(Predicate predicate)
anyMatch:此流的任意元素有一个匹配返回ture,都不匹配返回false

boolean allMatch(Predicate predicate)
allMatch:此流的所有元素是都匹配返回ture,否者为false

boolean noneMatch(Predicate predicate)
noneMatch:此流中没有一个元素匹配返回ture,否者返回false

// 全部匹配
System.out.println(Stream.of(5, 6, 7, 8, 9).allMatch(i -> i >= 5)); // true
System.out.println(Stream.of(5, 6, 7, 8, 9).allMatch(i -> i > 5)); // false

// 任意一个匹配
System.out.println(Stream.of(5, 6, 7, 8, 9).anyMatch(i -> i > 5)); // true
System.out.println(Stream.of(5, 6, 7, 8, 9).anyMatch(i -> i > 9)); // false
                
// 都不匹配
System.out.println(Stream.of(5, 6, 7, 8, 9).noneMatch(i -> i > 5)); // false
System.out.println(Stream.of(5, 6, 7, 8, 9).noneMatch(i -> i > 9)); // true

reduce

聚合操作 sum()、max()、min()、count()调用的都是reduce
Optional reduce(BinaryOperator accumulator)
无初始值,按传入的lambda的累加规则来聚合数据

// 无默认值,求和
Optional sum1 = Arrays.asList(1, 2, 3, 4, 5)
                            .stream()
                            .reduce((a, b) -> a + b);
System.out.println(sum1.get()); // 输出:15

T reduce(T identity, BinaryOperator accumulator)
第一个参数为初始值,第二个参数为累加器(归并数据的lambda)

// 有默认值,求和
Integer sum2 = Arrays.asList(1, 2, 3, 4, 5)
                    .stream()
                    .reduce(5, (a, b) -> a + b);
System.out.println(sum2); // 输出:20

// 求最大值
Integer max = Arrays.asList(1, 2, 3, 4, 5)
                    .stream()
                    .reduce(0, Integer::max); // 也可以写成 reduce(0, (a, b) -> a > b ? a : b);
System.out.println(max); // 输出:20

U reduce(U identity,BiFunction accumulator,BinaryOperator combiner)
combiner:合并器,用于合并累加器的值,这个参数只有在并行流下才会生效
reduce操作可以并行进行,为了避免竞争,每个reduce线程都会有独立的result,combiner的作用在于合并每个线程的result得到最终结果。

Integer reduce = Arrays.asList(1, 2, 3, 4, 5)
                        .parallelStream()
                        .reduce(0, (a, b) -> a + b, (c, d) -> c + d);
System.out.println(reduce); // 输出:20
reduce在并行流中的注意事项
System.out.println(
    Arrays.asList(1, 2, 3)
        .parallelStream()
        .reduce(0,(a, b) -> (a - b),(c, d) -> c + d)
);
// 如果无意料,那么输出将会是 -6,当运行程序的时候结果却是 -2,这与我们的预期结果大大不符
// 为什么会是-3呢,那么在序列流和并行流结果不一致,将以上代码修改一下,把参数和线程打印出来
System.out.println(
    Arrays.asList(1, 2, 3)
        .parallelStream()
        .reduce(0,
           (a, b) -> {
               System.out.format("a:%s b:%s  Thread:%s /n", a, b, Thread.currentThread().getName());
               return a - b;
           },
           (c, d) -> {
               System.out.format("c:%s d:%s Thread:%s /n", c, d, Thread.currentThread().getName()); 
               return c - d;
           }
));

输出如下

累加器的输出:0-2,0-3,0-1
合并器的输出:-2 - (-3),-1-1
执行流程如下图所示

在并行流中,reduce计算的方式与序列流不同,这归根于fork/join的特殊性,所有任务不断拆分,如果有初始值,那么会在累加阶段会以每个初始值与流中的数据累加,例如初始值为1,执行一个求和的累加,那么如果有N个元素,那么最终结果值为SUM + (N * 1),在相乘,相加,相减等等计算在使用并行流时需要好好考虑由并行带来的影响,当然如果只是聚合计算(sum,avg,max,min)可以放心的使用,如果采用自定义计算规则,那么一定需要谨慎使用,并测试。

findFirst,findAny

Optional findFirst()
返回此流的第一个元素的Optional,如果流为空,则返回空Optional。
Optional findAny()
返回此流的任意一个元素的Optional,如果流为空,则返回空Optional。
findFirst在并行流中的执行代价非常大,需要注意

Optional first = Arrays.asList(1, 2, 3, 4, 5)
                                .stream().findFirst();
System.out.println(first.get()); // 输出 1

Optional any = Arrays.asList(1, 2, 3, 4, 5)
                            .stream().findAny();
System.out.println(any.get()); // 因为是顺序流,所以输出1

collect

R collect(Collector collector)
收集,对数据做聚合,将流转换为其他形式,比如List,Map,Integer,Long...

// 准备一些初始数据
@Data
@AllArgsConstructor
class Student {
    private String name;    
    private Integer age;
}

// 初始化数据
Student student1 = new Student("zhangsan", 20);
Student student2 = new Student("lisi", 15);
Student student3 = new Student("wangwu", 10);
Student student4 = new Student("zhaoliu", 20);
List students = Arrays.asList(student1, student2, student3, student4);
// 如果要取出所有学生的姓名并转成集合可以写成
List names = students.stream()
                                .map(Student::getName) // 获取name
                                .collect(Collectors.toList()); // 转成List
System.out.println(names); // 输出:[zhangsan, lisi, wangwu, zhaoliu]

// 以年龄为key,姓名为value转成Map可以写成
Map map = students.stream()
                                .collect(Collectors.toMap(Student::getAge, Student::getName)); // 此写法会有问题,如果Map的key重复了,会报java.lang.IllegalStateException: Duplicate key  如果可以确保key不会重复就可以省略第三个参数        

Map map = students.stream()
                                .collect(Collectors.toMap(Student::getAge, Student::getName, (first, second) -> second)); // 前面两个参数是映射key和value,第三个参数为如果key重复了要如何处理,是保留旧的还是选择新的
System.out.println(map); // 输出:{20=zhaoliu, 10=wangwu, 15=lisi}  因为zhangsan和zhaoliu的年龄都是20,按照我们的策略,始终选择新的,所以key为20的value是zhaoliu

Map> groupByAge = students.stream()
                                .collect(Collectors.groupingBy(Student::getAge)); // 根据age分组
System.out.println(groupByAge);
// 输出:{20=[Student(name=zhangsan, age=20), Student(name=zhaoliu, age=20)], 10=[Student(name=wangwu, age=10)], 15=[Student(name=lisi, age=15)]}

R collect(Supplier supplier,BiConsumer accumulator,BiConsumer combiner)
supplier:定义一个容器
accumulator:该容器怎么添加流中的数据
combiner:容器如何去聚合

// 仿Collectors.toList(),简单实现一个toList()
// 1.定义一个List容器
// 2.调用List的add方法将元素添加到容器中
// 3.采用List的addAll方法聚合容器
List toList = Arrays.asList(1, 2, 3, 4).stream().collect(ArrayList::new, List::add, List::addAll);
System.out.println(toList);
// 输出:[1, 2, 3, 4]

// 仿Collectors.toMap(),简单实现toMap()
// 1.定义一个Map容器
// 2.调用Map的merge方法将元素添加到容器中
// 3.采用Map的putAll方法聚合容器
Map map = students.stream()
                                        .collect(HashMap::new, 
                                            (holder, element) -> {
                                                holder.merge(element.getAge(), element.getName(), (u, v) -> {
                                                return u;        
                                                // throw new IllegalStateException(String.format("Duplicate key %s", u));
                                            });
                                        }, Map::putAll);
System.out.println(map);
// 输出:{20=zhangsan, 10=wangwu, 15=lisi}

总结

  1. lambda由参数列表,箭头,主体组成。
  2. 函数式接口只能拥有一个抽象方法,可以拥有多个默认方法,多个静态方法。
  3. 方法引用实际就是Lambda的快捷写法。
  4. 流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。你可以从原始数据源那里再获得一个新的流来重新遍历一遍。
  5. 并行流是采用ForkJoin实现的。
  6. 在并行流中,不要在peek,map中不要去修改外部数据。
  7. 并行流使用需要注意,不要靠猜测,请多测试。
  8. 接口默认方法,优先级最低,子类会继承默认方法并且可以覆盖默认方法。如果因为多继承问题引起冲突(子类实现了两个接口,两个接口都拥有相同的方法名,相同函数描述符),那么必须覆盖该方法,如果期望调用某接口中的默认方法,可以使用X.super.m(…)来显示调用哪个接口的默认方法。
  9. 接口静态方法,子类不会继承,也不能覆盖,但是可以定义一个名称相同返回值相同的普通或静态方法。

版权声明:
作者:Mr李
链接:https://www.techfm.club/p/44822.html
来源:TechFM
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>