1 

Java并发

     并发编程可以是程序执行速度得到极大提高,或者为设计某些类型的程序提供更易用的模型,或者两者皆有。

1 并发的多面性

     用并发解决问题大体上可以分为“速度”和“设计可管理性”两种。

    1.1 更快的执行

        可能在多处理器上使用多线程技术我们更容易理解,但是,并发通常是提高运行在单处理器上的程序的性能,这听起来可能有违直觉。在单处理器上顺序执行在开销上确实更小,但是如果碰到一些操作(如I/O等)会造成阻塞,那么后续的任务就不能继续执行了,整个程序都将停下来,而使用并发编程就很好的解决了这个问题。(当然,如果没有阻塞任务,在单处理器上使用并发就毫无意义了)

    1.2 改进代码设计

         并发可以极大的简化程序设计。

2 基本的线程机制

    并发编程使我们可以将程序划分为多个分离的,独立运行的任务。通过使用多线程机制,这些独立应用(子线程)中的每一个都将由执行线程来驱动。一个线程就是在进程中的一个单一的顺序控制流,所以单个进程可以拥有多个并发执行的任务,程序使得每个任务都好像拥有自己的CPU一样,底层机制就是切分CPU时间,我们通常都不需要考虑这个。
    多任务和多线程往往是使用多处理器系统的最合理方式。

    2.1 定义任务(Runnable接口)

        (此处先说明一下:可能不了解java并发的,一开始看不明白,其实主要是任务和线程的一些概念不清楚,请继续向下看,会一步步解释的)

        要想定义任务,只需要实现Runnable接口并编写run()方法即可。

public class SimpleI implements Runnable {
    protected int countMax = 10;//倒计时
    private static int taskCount = 0;
    private final int id = taskCount++;//区分任务的多个实例
    public String status(){
        return "#" + id + "(" + (countMax>0?countMax:"over") + "),";
    }
    @Override
    public void run() {
        while(countMax-- > 0)
            System.out.print(status());
Thread.yield();//对线程调度器的一种建议:“我已经完成对生命周期最重要的部分,是时候切换了” } public static void main(String args[]){ new SimpleI().run(); new SimpleI().run(); } }
输出:#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),

         这个任务并不是由单独的线程驱动的(其实也是由线程驱动,不过这个线程是main所在的那个线程),如果要实现真正的线程能力,需要显示的将一个任务附加到线程上。

    2.2 Thread类

         将Runnable对象提交给Thread构造器即可转变为一个工作任务:

public static void main(String args[]){
        Thread t = new Thread(new SimpleI());
        t.start();
        System.out.println("flag---:");
    }//输出:flag---:#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),

         此时,main()线程中其他操作与new SimpleI().run()是“同时”执行的。

public static void main(String args[]){
        int i = 5;
        while(i-->0)
            new Thread(new SimpleI()).start();
        System.out.print("flag---:");
    }/*输出:
flag---:#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),#2(9),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(over),#3(9),#3(8),#3(7),#3(6),#3(5),#3(4),#3(3),#3(2),#3(1),#3(over),#4(9),#4(8),#4(7),#4(6),#4(5),#4(4),#4(3),#4(2),#4(1),#4(over),*/

        当然,我们也可以直接继承Thread类,然后调用.start()方法,但是Runnable是一个接口,如果我们直接继承Therad的话,就没办法继承其他类了,还有就是Runnable更适合资源的共享。以下为其他写法:

 RunnableSelf.java:

public class RunnableSelf implements Runnable {
    public RunnableSelf() {
        new Thread(this).start();
    }
    @Override
    public void run() {}
}

ThreadSelf.java:

public class ThreadSelf extends Thread{
    public ThreadSelf() {
        start();
    }
    @Override
    public void run() {}
}

        如果相对简单,这可能是安全的,但是,如果另一个任务在构造器完成之前开始执行,这将引发问题,所以使用Excutor才是更优的选择(使用内部类也是一个很有用的方法,可以将代码隐藏在类内部)。

        在Java中,Thread类自身并不执行任何操作,只是驱动赋予它的任务。Runnable意为可猎取的,在此处这个借口名字选择并不是很好,或许使用Task就好多了。

      (注意:new的Thread对象是没法被回收的,可线程池对此作优化)

    2.3 Executor(执行器)

        执行器Executor可以管理Thread对象,Java提供4中不同的线程池。
        CachedThreadPool:将为每个任务都创建一个线程。一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
        FixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
        SingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
        ScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。

public static ExecutorService newFixedThreadPool(int nThreads)
//创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
//创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
//创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
//创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
    public static void cachedThread(){
        ExecutorService exec = Executors.newCachedThreadPool();
        int i = 5;
        while(i-- > 0)
            exec.execute(new SimpleI());
        exec.shutdown();
        System.out.println("cachedThread:");
    }
    public static void fixedThread(){
        ExecutorService exec = Executors.newFixedThreadPool(5);
        int i = 5;
        while(i-- > 0)
            exec.execute(new SimpleI());
        exec.shutdown();
        System.out.println("fixedThread:");
    }
    public static void singleThread(){
        ExecutorService exec = Executors.newSingleThreadExecutor();
        int i = 5;
        while(i-- > 0)
            exec.execute(new SimpleI());
        exec.shutdown();
        System.out.println("singleThread:");
    }

        调用后分别有如下输出:

cachedThread:
#1(9),#3(9),#2(9),#0(9),#4(9),#4(8),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#4(7),#4(6),#4(5),#4(4),#4(3),#4(2),#4(1),#4(over),#2(8),#2(7),#3(8),#1(8),#3(7),#2(6),#3(6),#1(7),#3(5),#2(5),#3(4),#1(6),#3(3),#2(4),#3(2),#1(5),#3(1),#2(3),#3(over),#1(4),#1(3),#1(2),#2(2),#1(1),#2(1),#1(over),#2(over),
fixedThread:
#4(9),#3(9),#1(9),#2(9),#1(8),#0(9),#1(7),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(over),#3(8),#3(7),#4(8),#3(6),#4(7),#3(5),#4(6),#3(4),#4(5),#3(3),#4(4),#3(2),#4(3),#3(1),#4(2),#3(over),#4(1),#4(over),
singleThread:
#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),#2(9),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(over),#3(9),#3(8),#3(7),#3(6),#3(5),#3(4),#3(3),#3(2),#3(1),#3(over),#4(9),#4(8),#4(7),#4(6),#4(5),#4(4),#4(3),#4(2),#4(1),#4(over),

        在任何线程池中,任何现有的线程,在可能的情况下,都会被自动复用。
        通常,CachedThread是合理的Executor的首选,它会创建与所需数量相当的线程,然后再回首线程时停止创建新线程。如果这种方式引发了问题,就得考虑切换到FixedChread了。

    2.4 线程池生命周期

         ExecutorService扩展了Executor并添加了一些生命周期管理的方法。
        一个Executor的生命周期有三种状态,运行 ,关闭 ,终止 。
        Executor创建时处于运行状态。
        当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务,
        所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。 如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。

    2.5 从任务中返回结果

         Runnable是执行工作的独立任务,但是不返回任何值。如果需要返回值,那么就实现Callable接口而不是Runnable。Callable(Java SE5中引入的)是一种具有类型参数的泛型,类型参数表示的是从call()中返回的值,并且必须使用ExecutorService.submit()调用:

public class CallableDemo implements Callable<String>{
    static int id = 0;
    @Override
    public String call() throws Exception {
        return System.currentTimeMillis() + ":" + id++ +",";
    }
    public static void main(String args[]) throws InterruptedException, ExecutionException{
        ExecutorService service = Executors.newCachedThreadPool();
        int i = 5;
        List<Future<String>> result = new ArrayList<>();
        while(i-- > 0)
            result.add(service.submit(new CallableDemo()));
        System.out.println("CallableDemo:" + System.currentTimeMillis());
        for(Future<String> f : result)
            System.out.print(f.get());
    }
}/*输出:CallableDemo:1448935006857
1448935006857:1,1448935006857:0,1448935006859:2,1448935006859:3,1448935006859:4,*/

         submit()方法会产生Future对象。可以调用isDone()方法查询Future是否已经完成。使用get()方法获取结果,使用get()会阻塞直到结果准备就绪。

    2.6 休眠

         可以直接调用sleep()将任务终止给定的时间,可以抛出InterruptedException异常,但是异常不能跨线程传播,所以必须在任务内处理,但这已经Old了,Java SE5引入了显示的sleep()版本,作为TimeUnit类的一部分:

//      Thread.sleep(100);  //old style
//      TimeUnit.MICROSECONDS.sleep(100);   //Java SE5/6

    2.7 优先级

        线程的优先级会将它的重要性传递给调度器,使得优先级越高的,将得到更高的执行频率。

        使用getPriority()和(任何时候)setPriority()读取和设置现有线程的优先级:

public class SimplePriorities implements Runnable {
    int priority;
    double d = 0.0;
    public SimplePriorities(int priority) {
        this.priority = priority;
    }
    @Override
    public void run() {
        Thread.currentThread().setPriority(priority);
        for(int i = 1;i<100000000;i++)
            d += (Math.PI + Math.E)/(double)i*Math.PI;
        System.out.print("priority:" + priority + ",");
        Thread.yield();//让步:工作得差不多了,可以让别的线程使用了O(∩_∩)O~
    }
    public static void main(String args[]){
        ExecutorService executorService = Executors.newCachedThreadPool();
        int i = 5;
        while(i-->0)
            executorService.execute(new SimplePriorities(Thread.MIN_PRIORITY));
        i = 5;
        while(i-->0)
            executorService.execute(new SimplePriorities(Thread.NORM_PRIORITY));
        executorService.execute(new SimplePriorities(Thread.MAX_PRIORITY));
        System.out.println("SimplePriorities:");
        executorService.shutdown();
    }
}/*SimplePriorities:
priority:10,priority:5,priority:5,priority:5,priority:5,priority:5,priority:1,priority:1,priority:1,priority:1,priority:1,*/

         这里,在线程中做了大量的浮点数运算,以保证线程能够消耗一定的资源,当然不同机器效果不一定相同,如果cpu能力一般,可将循环次数减少。

        虽然JDK提供了10个优先级,但是考虑到各大操作系统的差异性,映射关系并不稳定,所以通常只使用给定的3中级别。

     2.8 join()

        一个线程可以在其他线程之上调用join()方法:在被调用线程结束后继续执行,还可以带超时参数,中断join()方法的调用是在调用线程上调用interrupt()方法(结合try)。

 JoinDemo.java:

public class JoinDemo {
    public static void main(String args[]){
        Sleeper sleepy = new Sleeper("Sleepy", 1500),
                grumpy = new Sleeper("Grumpy", 1500);
        new Joiner("Dopey", sleepy);
        new Joiner("Doc", grumpy);
        grumpy.interrupt();
    }
}
class Sleeper extends Thread{
    int duration;
    public Sleeper(String name,int time) {
        super(name);
        duration = time;
        start();
    }
    @Override
    public void run() {
        try {
            sleep(duration);
        } catch (InterruptedException e) {
            System.out.println(getName() + " was interrupted. " + "isInterrupted():" + isInterrupted());    //被中断
            return;
        }
        System.out.println(getName() + " has awakened");//被唤醒
    }
}
class Joiner extends Thread{
    Sleeper sleeper;
    public Joiner(String name,Sleeper sleeper) {
        super(name);
        this.sleeper = sleeper;
        start();
    }
    @Override
    public void run() {
        try {
            sleeper.join();
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        System.out.println(getName() + " join completed");
    }
}/*输出:Grumpy was interrupted. isInterrupted():false
Doc join completed
Sleepy has awakened
Dopey join completed*/

        Sleeper是一个Thread类型,在run中,sleep()可能会在指定的时间满后返回,也可能被中断。在catch里面,将根据isInterrupted()的返回值报告这个中断,当另一个线程在该线程上调用interrupt()时,将给该线程设定一个标志,表明该线程已经被中断了。异常被捕获时将清理这个标志,所以异常被捕获时,标志总是为假。
        Joiner线程将通过在Sleeper对象上调用join()来等待Sleeper醒来,从输出可以发现,Sleeper被中断或者正常结束,Joiner和Sleeper将一起结束(需看详细过程,可在Joiner中sleep一定时间并打印时附加System.currentTimeMillis()即可)。

     2.9 捕获异常

        由于线程的本质特征,我们是不能从线程中捕获逃逸的异常的。异常逃出任务的run方法后,就会向外传播到控制台:

public class ExceptionThread implements Runnable {
    @Override
    public void run() {
        throw new RuntimeException("anxpp.com");
    }
    public static void main(String args[]){
        try{
            Executors.newCachedThreadPool().execute(new ExceptionThread());
        }catch(RuntimeException e){}
    }
}/*输出:
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: anxpp.com
    at com.anxpp.concurrent.ExceptionThread.run(ExceptionThread.java:8)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)*/

        可以看到,即使在main中使用了try语句也不能捕获到异常。

        (Java SE5之前,可以通过线程组来捕获)现在,我们可以使用Excutor来捕获这些异常。为了解决问题,我们需要修改Executor产生线程的方式。Thread.UncaughtExceptionHandler(Java SE5中的新接口)允许在每个线程上都附着一个异常处理器,Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用:

public class CaptureThreadException {
    public static void main(String args[]){
        ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory());
        service.execute(new ExceptionThreadN());
        service.shutdown();
        System.out.println("CaptureThreadException:");
    }
}
class ExceptionThreadN implements Runnable{
    @Override
    public void run() {
        Thread thread = Thread.currentThread();
        System.out.println("run() by " + thread.getName());
        System.out.println("eh=" + thread.getUncaughtExceptionHandler());
        throw new RuntimeException("anxpp.com_exception");
    }
}
class MyUncaughtExceptionHandler implements UncaughtExceptionHandler{
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("caught:" + e);
    }
}
class HandlerThreadFactory implements ThreadFactory{
    @Override
    public Thread newThread(Runnable r) {
        System.out.println(this + " creating new Thread");
        Thread t = new Thread(r);
        t.setName("T");
        System.out.println("created:" + t.getName());
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        System.out.println("eh=" + t.getUncaughtExceptionHandler());
        return t;
    }
}/*输出:
com.anxpp.concurrent.HandlerThreadFactory@5abe753a creating new Thread
created:T
eh=com.anxpp.concurrent.MyUncaughtExceptionHandler@5e9a94
run() by T
eh=com.anxpp.concurrent.MyUncaughtExceptionHandler@5e9a94
CaptureThreadException:
caught:java.lang.RuntimeException: anxpp.com_exception*/

         可以看到,未捕获的异常是通过uncaughtException来捕获的。如果需要在各处使用相同的异常处理器,可设置一个静态域,并将处理器设置为默认的。

 3 (受限)资源共享

     有了并发就可以同时做多件事情,但是,两个和多个线程彼此互相干涉的问题就出现了。

    3.1 引出问题

        首先,编写一个可以产生偶数的抽象类:

public abstract class IntGenerator {
    protected int value = 0;
    private volatile boolean canceled = false;
    public abstract int next();
    public void cancel(){   canceled = true; }
    public boolean isCanceled(){    return canceled; }
}

         然后编写一个测试类(可测试所有继承自上面这个类的子类),并实现抽象类后测试:

public class Check implements Runnable {
    IntGenerator g;
    final int id;
    public Check(IntGenerator g,int id) {
        this.id = id;
        this.g = g;
    }
    @Override
    public void run() {
        while(!g.isCanceled()){
            int val = g.next();
            if(val % 2 != 0){
                System.out.println("not pass:" + val);
                g.cancel();
            }
        }
    }
    public static void test(IntGenerator g,int count){
        ExecutorService service = Executors.newCachedThreadPool();
        for(int i = 0;i < count; i++)
            service.execute(new Check(g, i));
        service.shutdown();
    }
    public static void test(IntGenerator g){ test(g,20); }//default count
    public static void main(String args[]){
        System.out.println("begin:");
        test(new IntGenerator() {
            @Override
            public int next() {
                value++;////危险,此处放置yield会更快出现问题!
                value++;
                return value;
            }
        },10);
    }
}/*输出
begin: not pass:575 not pass:579 not pass:577 not pass:581*/ 

         一个任务可能会在另一个任务执行2次自增之间调用next()方法,造成程序最终运行失败。再者,Java中的自增并不是原子操作,在自增过程中任务也可能会被线程机制挂起!所以,即使是单一递增,不保护任务也不是安全的。

 

    3.2 解决共享资源竞争问题

        我们在使用多线程的时候,总是要多加谨慎,就像我们打LOL的时候,引导技能正准备最后一击带他升天的时候,敌人躺下了,因为你的线程挂起了,另一个玩家击杀了他。那么解决之道就是当资源被一个任务使用的时候,在其上加锁。基本所有的并发模式在解决线程冲突的时候,都是采用序列化访问共享资源的方案。在代码前加的锁语句产生了一种相互排斥的效果,就是常常被称为互斥量的机制。
        比如:大家都想单独使用卫生间(排除一些爱好比较广泛的人),为了使用,就先敲门,如果没人就进入并锁上门后使用,其他的人就不能进来只能等待直到可以使用。而且,人们也并没有排队,谁都可能是下一个敲门的人。可以通过yield()和setPriority()给线程调度器提供建议(虽然不一定有效,具体效果取决于具体平台和JVM的实现)。
        Java通过提供关键字 synchronized 的形式,提供内置支持。

        共享资源一般是以对象形式存在,但也可能是文件,其他I/O或打印机。要控制对共享资源的访问,得先将其包装进一个对象。然后把访问这个对象的方法标记为 synchronized :

synchronized void f(){/* ... */}

        (同步控制)改造以上方法:

    public static void main(String args[]){
        System.out.println("begin:");
        test(new IntGenerator() {
            @Override
            public synchronized int next() {
                value++;
                Thread.yield();//问了增加value为奇数时上下文切换的可能性
                value++;
                if(value%20000000 == 0) System.out.println(value);
                return value;
            }
        },100);
    }/*输出:
begin:
20000000
40000000
60000000
...一直都不会出问题*/

        任何时候,都只有一个任务可以通过由互斥量看护的代码。

        使用显示的Lock对象

    public static void main(String args[]){
        System.out.println("begin:");
        test(new IntGenerator() {
            private Lock lock = new ReentrantLock();
            @Override
            public int next() {
                lock.lock();
                try{
                    value++;
                    Thread.yield();
                    value++;
                    if(value%20000000 == 0) System.out.println(value);
                    return value;//必须在try语句里,不然unlock()会过早发生!
                }finally{
                    lock.unlock();
                }
            }
        },100);
    }

 

        显示的锁代码会不那么优雅,但是更加灵活。通常我们可以使用更简洁的方式,但在解决特殊问题时(如处理异常、清理工作等),就要使用显示的Lock了。

    3.3 原子性与易变量

        由于Java的跨平台特性,诸如自增等不像C++(通常情况下)一样是原子操作(即这个操作过程是不能中断的),一旦操作开始,它一定可以在可能发生的“上下文切换”之前执行完毕。所以在Java上我们应该避免使用原子性来代替同步。如果一定要玩火,请接受下面的测试:Goetz测试(以为并发专家命名的测试):如果你可以编写用于现代微处理器的高性能JVM,那么久有资格去考虑是否可以避免同步!

        易变性(当时在单片机上做c开发的时候就吃过亏),volatile意为易失的。本人也没做深入了解,就不说多了,毕竟也很少用,但有以下原子类,在设计性能调优时挺有用的,他们可以在机器级别上获得原子性:
        AtomicInteger,AtomicLong,AtomicReference等,具体如何操作请度娘,这里将改写上面的方法一边丢掉synchronized:

    public static void main(String args[]){
        test(new IntGenerator() {
            @Override
            public int next() {
                return i.addAndGet(2);
} },100); }/*这里消除了synchronized关键字,但程序不会失败!*/

     3.4 临界区(critical section)

        只希望防止多个线程访问方法内部分代码(同步控制块),可以用如下方式:

    synchronized(syncObject/*指定对象*/){
        //code here  对花括号内代码进行同步控制
    }

         使用同步控制块而不是对整个方法进行同步控制可以显著提高性能!

    3.5 在其他对象上同步

    3.6 线程本地存储

4 终结任务

 

 // (时间原因)未完待续