探索 java.util.concurrent 包的内容

分享不易,感谢关注转发

什么是并发编程?

探索 java.util.concurrent 包的内容

在一个程序中,当多个线程似乎同时在运行时,我们就有了并发性。如果两个进程实际上同时运行,我们就有了并行性。

我们并没有深入探讨并发的概念,而是探索java并发包相关的内容。

注意:不可能在一篇文章中涵盖内容部分中提到的所有内容。

内容

  1. java.util.concurrent

a) Executors & Future

b) 并发集合

  1. java.util.concurrent.locks
  2. java.util.concurrent.atomic

这个包中包含的主要组件是——执行器队列、定时、同步器和并发集合

在本文中,我们将探讨在基于 Web 的应用程序中大量使用的两个组件——执行器和并发集合。

执行器框架中的类和接口及其层次结构如下图所示,

Executor

探索 java.util.concurrent 包的内容

ThreadPoolExecutorScheduledThreadPoolExecutor类之间的区别在于后者可用于调度任务执行,例如添加延迟等。在这个包中需要记住的另一个重要类是Executors,此类提供大量的工厂方法。

尝试使用接口并创建一个 ThreadPool 并通过从该池中获取线程来执行一些任务,用Executors工厂类中的方法来创建线程池。

package com.example.concurrent.config;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ExecutorServiceSample {

        public static void main(String[] args) {
                ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());

                // submit(Runnable task)
                executor.submit(() -> {
                        System.out.println("Implemented Runnable Interface"+Thread.currentThread().getName());
                        try {
                                Thread.sleep(10);
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                });

                // submit(Runnable task, T result)
                executor.submit(() -> {
                        System.out.println("Implemented Runnable Interface"+Thread.currentThread().getName());
                }, 1);

                // submit(Callable task)
                executor.submit(() -> {
                        System.out.println("Implemented Callable Interface"+Thread.currentThread().getName());
                        return 1;
                });
                executor.shutdown();

        }
}

//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
        static int i = 1;
        public Thread newThread(Runnable r) {
                Thread t = new Thread(r,""+i);
                i++;
                return t;
        }
}

在上面的示例中,创建了一个初始大小为 2 的 ThreadPool,还提供了我们自己的 ThreadFactory 实现,它是可选的。我们可以跳过我们自己的实现,执行器将采用默认的 ThreadFactory 实现,实现自己的工厂的原因是为新线程命名并确定哪个线程正在运行我们的任务。

接着,使用 ExecutorService 的submit()方法执行了三个任务请注意,它们三个都有不同的签名,任务类型可以是CallableRunnable类型的提交。

Runnable 和 Callable 执行类似的工作。它们都在新线程中执行任务,但Callable 可以返回结果,也可以抛出检查异常

Future 是一个带有签名接口 Future 的接口。它代表异步执行的结果。在上面的代码示例中,executor 的提交方法返回一个 Future。

concurrent 包中很多类都实现了这个接口,来看看 Future 接口最常用的实现CompletableFuture

  1. runAsync() :返回一个新的 CompletableFuture。接受一个 Runnable 对象。如果您想运行代码块并且不期望返回值,请使用此方法。
  2. supplyAsync():返回一个新的CompletableFuture。接受一个 Supplier 对象,并返回通过调用给定的 Supplier 获得的值。如果要运行代码块并期望返回值,请使用此方法。

请注意,这两种方法都可以选择接受 executor 实例作为参数。如果我们不为这些方法提供执行程序,则使用创建新线程ForkJoinPool.commonPool()。

我们已经实现了自己的 ThreadPool 并且可以控制默认创建的线程数,所以我更喜欢将 executor 的实例作为参数传递。

package com.example.concurrent.config;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ExecutorServiceSample {

        public static void main(String[] args) {
                ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());

                //no return value
                CompletableFuture.runAsync(() -> {
                        System.out.println("Inside Runnable " + Thread.currentThread().getName());
                }, executor);
                
                //with return value
                CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
                        System.out.println("Inside Supplier " + Thread.currentThread().getName());
                        return true;
                        
                }, executor);
                
                //get blocks the thread and waits for the future to complete
                try {
                        System.out.println("Result from supplier " + cf.get());
                } catch (InterruptedException e) {
                        e.printStackTrace();
                } catch (ExecutionException e) {
                        e.printStackTrace();
                }

                
                //shutdown executor manually
                executor.shutdown();
        }
}

//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
        static int i = 1;
        public Thread newThread(Runnable r) {
                Thread t = new Thread(r,""+i);
                i++;
                return t;
        }
}

3. thenAccept():获取 CompletedFuture 任务的结果并将其作为参数传递给 Consumer 对象。如果需要基于 Future 对象的结果运行任务并且不希望从该任务返回结果,请使用此方法。

4. thenApply():获取 CompletedFuture 任务的结果并将其作为参数传递给 Function 对象。如果需要基于Future 对象的结果运行任务并且需要从该任务返回结果,请使用此方法。

package com.example.concurrent.config;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ExecutorServiceSample {

        public static void main(String[] args) {
                ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());

                // no return value
                CompletableFuture.runAsync(() -> {
                        System.out.println("Inside Runnable " + Thread.currentThread().getName());
                }, executor);

                // with return value
                CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
                        System.out.println("Inside Supplier " + Thread.currentThread().getName());
                        return true;

                }, executor);

                // accept the new Consumer object once the future is completed
                cf.thenAccept(flag -> {
                        if (flag)
                                System.out.println("Inside thenAccept() ");
                });

                // apply a Function object once the future is completed and returns boolean
                CompletableFuture applyResult = cf.thenApply(flag -> {
                        System.out.println("Inside thenApply() ");
                        return false;
                });

                // shutdown executor manually
                executor.shutdown();
        }
}

//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
        static int i = 1;

        public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "" + i);
                i++;
                return t;
        }
}

5. thenCompose():如果想组合两个未来执行的结果,其中一个未来任务依赖于另一个未来任务的输出,请使用此方法。

package com.example.concurrent.config;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ExecutorServiceSample {

        public static void main(String[] args) throws InterruptedException, ExecutionException {
                ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());

                CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
                        return 1;
                }, executor);

                // result of cf is passed to another future object that invokes add method
                CompletableFuture result = cf.thenCompose(x -> {
                        return CompletableFuture.supplyAsync(() -> {
                                return add(x, 2);
                        }, executor);
                });

                System.out.println("Result " + result.get());

                // shutdown executor manually
                executor.shutdown();
        }

        public static int add(int x, int y) {
                return x + y;
        }
}

//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
        static int i = 1;

        public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "" + i);
                i++;
                return t;
        }
}

6. thenCombine():如果你想组合两个独立的未来任务的输出,那么使用这个方法。

修改上面的代码以进行以下更改,我们将产生与上面的代码相同的结果。

CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
                        return 1;
                }, executor);

                CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
                        return 2;
                }, executor);
                
                // result of cf is passed to another future object that invokes add method
                CompletableFuture result = cf.thenCombine(cf1, (x,y) -> {
                        return add(x,y);
                });

                System.out.println("Result " + result.get());

并发集合

为多线程上下文设计的集合实现主要是

ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet, CopyOnWriteArrayList, 和CopyOnWriteArraySet,这些集合应用场景有明显区别,例如,Collections.synchronizedList(new ArrayList());返回一个同步的 ArrayList,同步集合上的线程安全是通过在整个对象上应用线程锁来实现的。

CopyOnWriteArraySet容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。缺点两个 1)占用内存 2)最终一致性

内容   java   util
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章