# 线程的管理

# 线程的创建和运行

  1. 继承 Thread,重写 run 方法并执行 start 方法。
  2. 实现 Runnable 接口的 run 方法,并传递给 Thread 实例并执行 start 方法。
// File1: 继承 Thread
public class MyThread1 extends Thread {
    
    @Override
    public void run() {
        // do something!
    }
    
    public static void main(String[] args) {
        new MyThread1().start();
    }
    
}
// File2: 继承 Thread,匿名子类
public class MyThread2 {
    
    public static void main(String[] args) {
        new Thread() {
            
            @Override
            public void run() {
                // do something!
            }
            
        }.start();
    }
    
}
// File3: 实现 Runnable
public class MyRunnable1 implements Runnable {
    
    public void run() {
        // do something!
    }
    
    public static void main(String[] args) {
        Runnable r = new MyRunnable1();
        new Thread(r).start();
    }
    
}
// File4: 实现 Runnable,匿名子类
public class MyRunnable2 {
    
    public static void main(String[] args) {
        Runnable r = new Runnable() {
            
            public void run() {
                // do something!
            }
            
        };
        new Thread(r).start();
    }
    
}

# 获取和设置线程信息

  1. ID:每个线程的独特标识;
  2. Name:线程的名称;
  3. Priority:线程对象的优先级。优先级别从低到高为 1-10 之间。设置后,不建议改变它们的优先级。
  4. Status:线程的状态。在 Java 中,只有这 6 种:new,runnable,blocked,waiting,time waiting,terminated。

前提:Thread t = new Thread ();

  1. t.getId() : long;
  2. Name 的设置与获取
    • new Thread(String name);
    • new Thread(Thread t, String name);
    • new Thread(Runnable r, String name);
    • t.setName(String name);
    • t.getName() : String;
  3. 设置与获取 Priority
    • Thread.MAX_PRIORITY;
    • Thread.MIN_PRIORITY;
    • t.setPriority(int i);
    • t.getPriority() : int;
  4. State 是一个枚举类型,如:Thread.State.TIMED_WAITING
    • t.getState() : Thread.State;

# 线程的中断

Thread t = new Thread();

  1. 检测是否中断:t.isInterrupted () : boolean;
  2. 任务中断:t.interrupt () : void;
public class Interrupt implements Runnable {
    public void run() {
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("interrupted.");
                return;
            }
            System.out.println("It's not interrupted.");
            try {
                Thread.sleep(800L);
            } catch (InterruptedException e) {
                System.out.println("interrupted while this thread is sleep.");
                return;
            }
        }
    }
    public static void main(String[] args) {
        Runnable r = new Interrupt();
        Thread t = new Thread(r);
        t.start();
        try {
            Thread.sleep(5000L);
            t.interrupt();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

# 操作线程的中断机制

检测线程是否中断,如果中断,则可以抛出异常 InterruptedException,并捕获处理。Thread.sleep 操作就需要自己捕获异常并处理,参考上面的线程的中断代码里的 run。

# 线程的睡眠与唤醒

当你需要线程在一段时间后才执行,则可以让线程进入睡眠,睡眠这段时间消耗资源比较少。

  1. Thread.sleep(long milliseconds);
  2. TimeUnit.SECONDS.sleep (long seconds); 还有其它的时间单位。

# 等待线程的终结

Thread t = new Thread();

当前线程调用另外一个线程 t.join () 时,则当前线程会暂停,直到 t 线程完成了,才会继续运行。

join 还有另外几个方法:

  1. t.join(long milliseconds);
    • 等待线程 t 结束后,调用 join 的线程继续运行;
    • 线程 t 还没结束,等待 milliseconds 后,调用 join 的线程直接继续运行。
  2. t.join(long milliseconds, long nanos);
    • 跟第一个一样,只不过可以精确到纳秒。

# 守护线程的创建与运行

  1. 线程分用户线程与守护线程,GC 就是一个守护线程;
  2. 守护线程优先级非常低,所以守护线程不能处理重要的任务;
  3. 用户线程都结束时,守护线程自动结束;
  4. 守护线程的设置 setDaemon 只能在 start 方法前调用,否则抛 IllegalThreadStateException 异常;
  5. 与其它线程一样,只不过是在 start 方法前调用 setDaemon (true)。

Thread t = new Thread();

  • t.setDaemon(boolean daemon);
  • t.isDaemon() : boolean;

# 在线程中处理不受控制的异常

Java 里有 2 种异常:

  1. 检查异常(Checked exceptions): 这些异常必须强制捕获它们或在一个方法里的 throws 子句中。 例如, IOException 或者 ClassNotFoundException。
  2. 未检查异常(Unchecked exceptions): 这些异常不用强制捕获它们。例如, NumberFormatException, RuntimeException。

不用担心线程的检查异常,因为 run 不能 throws 异常,所以只能在 run 内部捕获并处理;

但 run 内部出现未检查异常,则需要进行下面处理:

  1. 实现 Thread.UncaughtExceptionHandler 的 uncaughtException 方法;
  2. 设置线程的未捕获异常处理器,setUncaughtExceptionHandler。

Thread 对未检查异常处理的顺序是:

  1. 检查线程是否有设置 handler;
  2. 如果没设置,则检查线程组有没有设置 handler;
  3. 如果没设置,则检查 Thread 有没有设置 setDefaultUncaughtExceptionHandler 全局默认 handler;
  4. 如果没有就将 stack trace 输出到控制台。
// GlobalExceptionHandler.java
public class GlobalExceptionHandler implements Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread thread, Throwable throwable) {
        // do something!
    }
}
// Test.java
public class Test {
    
    public static void main(String[] args) {
        Thread t = new Thread() {
            @Override
            public void run() {
                // do something!
            }
        };
        // 设置未捕获异常处理器
        t.setUncaughtExceptionHandler(new GlobalExceptionHandler());
        t.start();
    }
    
}

# 使用本地线程变量

# ThreadLocal

多线程共享同一个对象时,就会出现相互影响。可以使用本地线程变量,每个线程都会储存各自的副本。

ThreadLocal<String> local = new ThreadLocal<String>() {
    @Override
    protected String initialValue() {
        return "initial value";
    }
};
  1. local.get() : <T>;
  2. local.set(<T> value);
  3. local.remove();

如果线程 A 里面创建线程 B,则线程 B 调用 local.get () 得到的是初始值,即没有继承线程 A 的本地线程变量。

# InheritableThreadLocal

使用 InheritableThreadLocal 可以继承本地线程变量,InheritableThreadLocal 是 ThreadLocal 的子类;

InheritableThreadLocal<String> local = new InheritableThreadLocal<String>() {
    @Override
    protected String childValue(String s) {
        return super.childValue(s); // 得到父线程的值,以父线程的值做初始化
    }
    @Override
    protected String initialValue() {
        return "initial value";
    }
};

与 ThreadLocal 的唯一区别是:InheritableThreadLocal 多了 childValue 方法。

线程 A 创建子线程 B 时,则会为子线程 B 初始化 childValue 方法,该方法可以设置其它的值。

# 使用线程组

# 创建分组与设置

为了方便管理多个线程,可以为线程分组,统一管理。

根线程组为 system,默认线程组为 main。

  • system
    • main
      • group1(自定义)
        • thread1
        • thread2
        • subGroup1
      • group2(自定义)
ThreadGroup group = new ThreadGroup("worker");
ThreadGroup subGroup = new ThreadGroup(group, "subWorker");
Runnable r = new Runnable() {
    @Override
    public void run() {
        // do something!
    }
};
Thread w1 = new Thread(group, "worker1");
Thread w2 = new Thread(group, r);
Thread w3 = new Thread(group, r, "worker3");

# 获取 Group 的信息与管理

  1. 获取信息:
    • group.activeCount () : int; 返回活动线程的估计数。
    • group.activeGroupCount () : int; 返回活动线程组的估计数。
    • group.getName () : String; 返回该线程组名称。
    • grou. getMaxPriority () : int; 返回该线程组下最大的优先级。
    • group.getParent () : ThreadGroup; 返回该线程组的父线程组。
    • group.list () : void; 打印线程组信息。
    • group.checkAccess () : void; 确定当前运行的线程是否有权修改此线程组。
  2. 判断信息:
    • group.isDaemon () : boolean; 返回该线程组是否是守护线程。
    • group.isDestroyed () : boolean; 返回该线程组是否被销毁。
    • group.parentOf (ThreadGroup parent) : boolean; 判断指定线程组是否是该线程组的父级。
  3. 管理线程:
    • group.destroy (); 销毁线程。
    • group.interrupt (); 中断线程。
    • group.setDaemon (boolean b); 设置是否是守护线程组。
    • group.setMaxPriority (int max); 设置最大优先级。
    • group.uncaughtException (Thread t, Throwable throwable); 设置全局未捕获异常处理器。

# 处理线程组内的不受控制异常

继承 ThreadGroup 并重写 uncaughtException 方法。

public class MyThreadGroup extends ThreadGroup {
    public MyThreadGroup(String s) {
        super(s);
    }
    @Override
    public void uncaughtException(Thread thread, Throwable throwable) {
        super.uncaughtException(thread, throwable);
    }
}

# 用线程工厂创建线程

实现 ThreadFactory 接口的 newThread 方法。

public class MyThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable runnable) {
        // do something!
        return null;
    }
    
}

# 基本线程同步

临界区:访问一个共享资源在同一时间不能超过一个线程执行的代码块。

# 同步方法

使用 synchronized 关键字控制并发线程。

# 根据修饰对象分类

  1. 修饰代码块
    • synchronized(this | object) {}
    • synchronized (类.class) {}
  2. 修饰方法
    • 修饰普通方法
    • 修饰静态方法

# 根据获取的锁分类

  1. 获取对象锁
    • synchronized(this | object) {}
    • 修饰普通方法
  2. 获取类锁
    • synchronized (类.class) {}
    • 修饰静态方法(静态方法内的 synchronized 块必须是类锁)

# 补充

  1. synchronized 关键字不能继承。
  2. 定义接口方法时,不能使用 synchronized 关键字。
  3. 构造方法不能使用 synchronized 关键字,但里面的代码块可以。
  4. 类锁和对象锁是互不干扰的。
  5. synchronized 消耗资源比较大,而且导致死锁发生,所以不是临界区的就不使用,尽量避免同步。

# 在同步的类里安排独立属性

你也可以在同步的类里面创建对象,使用该对象作为对象锁,而不必使用 this。

public class ShareClass {
    // 用于对象锁
    private final Object object = new Object();
    private long val;
    
    public ShareClass(long val) {
        this.val = val;
    }
    
    public void addVal(long val) {
        //object 锁
        synchronized (object) {
            this.val += val;
        }
    }
}

# 在同步代码中使用条件

缓冲区是一个共享的数据结构,我们必须采用同步机制,比如 synchronized 关键字来控制对它的访问。但是我们有更多的限制因素,如果缓冲区是满的,生产者不能存储数据,如果缓冲区是空的,消费者不能取出数据。

对于这些类型的情况,Java 在 Object 对象中提供 wait (),notify (),和 notifyAll () 方法的实现,这三个方法是 Object 里的,所以所有对象都有这三个方法。

在 synchronized 外部调用 wait () 会抛 IllegalMonitorStateException 异常,并且调用 wait () 方法后,JVM 会休眠,并释放 synchronized 锁。使用 notify (),和 notifyAll () 唤醒。

public class ShareClass {
    private final Object object = new Object();
    
    // 是否已经通知过了
    private boolean wasSignaled = false;
    private long val = 0;
    public void addVal(long val) {
        synchronized (object) {
            //while 防止丢失信号,假唤醒
            //wasSignaled 是否已经通知过了,如果通知过了,则不执行 wait ();
            while (!wasSignaled && /* other condition */) {
                try {
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // do something!
        }
    }
    
    public void notify() {
        // 已经通知过了
        wasSignaled = true;
        object.notify();
    }
}

使用时,要注意丢失信号和假唤醒

  1. 丢失信号:使用标记,如果通知过了,就不再执行 wait () 方法。
  2. 假唤醒:使用 while 来判断条件,而不是 if。

# 使用 Lock 来同步代码块

Java 提供另外的机制用来同步代码块。它比 synchronized 关键字更加强大,灵活和更好的性能。

Lock 接口:

  1. void lock();
  2. void unlock();
  3. boolean tryLock();
  4. boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException;
  5. Condition newCondition();
  6. void lockInterruptibly() throws InterruptedException;

Lock 接口的实现类:

  1. ReentrantLock
  2. ReentrantReadWriteLock.ReadLock
  3. ReentrantReadWriteLock.WriteLock

Lock 接口下的锁是手动释放的,所以建议在 try 块的 finally 下释放,防止异常导致锁没被释放。

# 使用读 / 写锁同步数据访问

  1. 如果有读操作,则有新的写操作时,写操作的线程就会阻塞;
  2. 如果有写操作,则有新的读操作或写操作,新的线程就会阻塞。

# 修改 Lock 的公平性

在 ReentrantLock 类和 ReentrantReadWriteLock 类的构造器中,允许一个名为 fair 的 boolean 类型参数,它允许你来控制这些类的行为。默认值为 false,这将启用非公平模式。

公平锁:排队,每个线程都有机会分到 CPU 时间片;
非公平锁:该线程会尝试获取锁,没有获取到就重新在队尾排队。

由于 tryLock () 方法并不会使线程进入睡眠,即使 Lock 接口正在被使用,这个公平属性并不会影响它的功能。

# 在 Lock 中使用多个条件

Lock lock = new Reentrant.....;
Condition c = lock.newCondition();
  1. c.await() : void throws InterruptedException;

  2. c.await(long time, TimeUnit unit) :boolean throws InterruptedException;

  3. c.awaitUntil(Date date) : boolean throws InterruptedException;

  4. c.awaitNanos(long nanos) : long throws InterruptedException;

  5. c.awaitUninterruptibly() : void;

  6. c.signal() : void;

  7. c.signalAll() : void;

# 线程同步工具

高级的机制来达到多线程的同步:

  1. Semaphores: 控制访问多个共享资源的计数器。此机制是并发编程的最基本的工具之一,而且大部分编程语言都会提供此机制。
  2. CountDownLatch: CountDownLatch 类是 Java 语言提供的一个机制,它允许线程等待多个操作的完结。
  3. CyclicBarrier: CyclicBarrier 类是又一个 java 语言提供的机制,它允许多个线程在同一个点同步。
  4. Phaser: Phaser 类是又一个 java 语言提供的机制,它控制并发任务分成段落来执行。全部的线程在继续执行下一个段之前必须等到之前的段执行结束。这是 Java 7 API 的一个新特性。
  5. Exchanger: Exchanger 类也是 java 语言提供的又一个机制,它提供 2 个线程间的数据交换点。

# 控制并发访问资源 (Semaphores)

创建信号量:资源的数量

// 默认非公平锁
Semaphore s = new Semaphore(1);
// 公平锁
Semaphore s = new Semaphore(1, true);

# 获取信号量

一直阻塞直到获取 n 个信号量成功或线程终止。

  1. s.acquire():void throws InterruptedException;
  2. s.acquire(int permits):void throws InterruptedException;

一直阻塞直到获取 n 个信号量成功,不会被打断

  1. s.acquireUninterruptibly();
  2. s.acquireUninterruptibly(int permits);

# 释放信号量

释放 n 个信号量。

  1. s.release();
  2. s.release(int permits);

# 尝试获取信号量

尝试获取 n 个信号量,如果没有则返回 false,该方法不阻塞,可以用在 if 上。

  1. s.tryAcquire();
  2. s.tryAcquire(int permits);
  3. s.tryAcquire(long timeout, TimeUnit unit);
  4. s.tryAcquire(int permits, long timeout, TimeUnit unit);

如果获取到,则处理完毕,需要 s.release 释放。

# 其它信息或控制

  1. s.drainPermits () : int 立即返回剩余的信号量,并置 0;
  2. s.availablePermits () : int 获取剩余的信号量个数;
  3. s.getQueueLength () : int 获取排队中等待 acquire 的线程数目;
  4. s.hasQueuedThreads () : boolean 判断是否还有在排队的线程;
  5. s.isFair () : boolean : 判断是否是公平锁;

# 等待多个并发事件完成

Java 并发 API 提供这样的类,它允许 1 个或者多个线程一直等待,直到一组操作执行完成。 这个类就是 CountDownLatch 类。它初始一个整数值,此值是线程将要等待的操作数。当某个线程为了想要执行这些操作而等待时, 它要使用 await () 方法。此方法让线程进入休眠直到操作完成。 当某个操作结束,它使用 countDown () 方法来减少 CountDownLatch 类的内部计数器。当计数器到达 0 时,这个类会唤醒全部使用 await () 方法休眠的线程们。

// 等待操作数
CountDownLatch c = new CountDownLatch(10);

当计时器为 0,全部唤醒后,就失效了,计时器不能增加,所以得新建对象才能重新使用。

# 一个或多个线程等待

  1. c.await () : void throws InterruptedException 等待直到计数器为 0,才被唤醒并运行;
  2. c.await (long timeout, Timeout unit) : void throws InterruptedException 等待直到计数器为 0 或超时,才被唤醒并运行;

# 减少计时器

  1. c.countDown () : 计时器 --,当计时器 == 0 时,会唤醒所有调过 await 的线程;

# 获取计时器的值

  1. c.getCount () : long 计时器的值;

# 在同一个点同步任务

CyclicBarrier 也叫同步屏障,在 JDK1.5 被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,所以被阻塞的线程才能继续执行。

#   线程    屏障
线程1 -----> |
  线程2 ---> | 全部到达屏障后,才能通过--->
 线程3 ----> |
Runnable r = new Runnable() {
    @Override
    public void run() {
        // do something!
    }
};
# 5个线程
CyclicBarrier c = new CyclicBarrier(5);
# 5个线程结束后,执行r线程
CyclicBarrier c = new CyclicBarrier(5, r);

# 到达同步点后,等待

  1. c.await () : int throws InterruptedException, BrokenBarrierException 等待,直到所有的线程都调用 await,才唤醒;
  2. c.await (int timeout, TimeUnit unit) : int throws InterruptedException, BrokenBarrierException 等待,直到所有的线程都调用 await 或超时,才唤醒;

# 重置

与 CountDownLatch 不同的是,CyclicBarrier 可以重置回到原来的状态。

  1. c.reset() : void;

调用 reset 后,之前调用 await 的线程会抛出 broken 异常。

# 其它信息

  1. c.getNumberWaiting () : int 返回已经调用 await 方法的线程数;
  2. c.getParties () : int 返回剩余还没调用 await 方法的线程数;
  3. c.isBroken () : boolean 当其中一个线程被中止了,则其它线程也会收到 broken 异常,且该 barrier 的状态就变成 broken 状态。

# 运行阶段性并发任务

# 创建

Phaser p = new Phaser();
Phaser p = new Phaser(int parties);
Phaser p = new Phaser(Phaser parent);
Phaser p = new Phaser(Phaser parent, int parties);

# 注册

  1. p.register() : int;
  2. p.bulkRegister(int parties) : int;