# 线程的管理
# 线程的创建和运行
- 继承 Thread,重写 run 方法并执行 start 方法。
- 实现 Runnable 接口的 run 方法,并传递给 Thread 实例并执行 start 方法。
// File1: 继承 Thread | |
public class MyThread1 extends Thread { | |
@Override | |
public void run() { | |
// do something! | |
} | |
public static void main(String[] args) { | |
new MyThread1().start(); | |
} | |
} | |
// File2: 继承 Thread,匿名子类 | |
public class MyThread2 { | |
public static void main(String[] args) { | |
new Thread() { | |
@Override | |
public void run() { | |
// do something! | |
} | |
}.start(); | |
} | |
} | |
// File3: 实现 Runnable | |
public class MyRunnable1 implements Runnable { | |
public void run() { | |
// do something! | |
} | |
public static void main(String[] args) { | |
Runnable r = new MyRunnable1(); | |
new Thread(r).start(); | |
} | |
} | |
// File4: 实现 Runnable,匿名子类 | |
public class MyRunnable2 { | |
public static void main(String[] args) { | |
Runnable r = new Runnable() { | |
public void run() { | |
// do something! | |
} | |
}; | |
new Thread(r).start(); | |
} | |
} |
# 获取和设置线程信息
- ID:每个线程的独特标识;
- Name:线程的名称;
- Priority:线程对象的优先级。优先级别从低到高为 1-10 之间。设置后,不建议改变它们的优先级。
- Status:线程的状态。在 Java 中,只有这 6 种:new,runnable,blocked,waiting,time waiting,terminated。
前提:Thread t = new Thread ();
- t.getId() : long;
- Name 的设置与获取
- new Thread(String name);
- new Thread(Thread t, String name);
- new Thread(Runnable r, String name);
- t.setName(String name);
- t.getName() : String;
- 设置与获取 Priority
- Thread.MAX_PRIORITY;
- Thread.MIN_PRIORITY;
- t.setPriority(int i);
- t.getPriority() : int;
- State 是一个枚举类型,如:Thread.State.TIMED_WAITING
- t.getState() : Thread.State;
# 线程的中断
Thread t = new Thread();
- 检测是否中断:t.isInterrupted () : boolean;
- 任务中断:t.interrupt () : void;
public class Interrupt implements Runnable { | |
public void run() { | |
while (true) { | |
if (Thread.currentThread().isInterrupted()) { | |
System.out.println("interrupted."); | |
return; | |
} | |
System.out.println("It's not interrupted."); | |
try { | |
Thread.sleep(800L); | |
} catch (InterruptedException e) { | |
System.out.println("interrupted while this thread is sleep."); | |
return; | |
} | |
} | |
} | |
public static void main(String[] args) { | |
Runnable r = new Interrupt(); | |
Thread t = new Thread(r); | |
t.start(); | |
try { | |
Thread.sleep(5000L); | |
t.interrupt(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
# 操作线程的中断机制
检测线程是否中断,如果中断,则可以抛出异常 InterruptedException,并捕获处理。Thread.sleep 操作就需要自己捕获异常并处理,参考上面的线程的中断代码里的 run。
# 线程的睡眠与唤醒
当你需要线程在一段时间后才执行,则可以让线程进入睡眠,睡眠这段时间消耗资源比较少。
- Thread.sleep(long milliseconds);
- TimeUnit.SECONDS.sleep (long seconds); 还有其它的时间单位。
# 等待线程的终结
Thread t = new Thread();
当前线程调用另外一个线程 t.join () 时,则当前线程会暂停,直到 t 线程完成了,才会继续运行。
join 还有另外几个方法:
- t.join(long milliseconds);
- 等待线程 t 结束后,调用 join 的线程继续运行;
- 线程 t 还没结束,等待 milliseconds 后,调用 join 的线程直接继续运行。
- t.join(long milliseconds, long nanos);
- 跟第一个一样,只不过可以精确到纳秒。
# 守护线程的创建与运行
- 线程分用户线程与守护线程,GC 就是一个守护线程;
- 守护线程优先级非常低,所以守护线程不能处理重要的任务;
- 用户线程都结束时,守护线程自动结束;
- 守护线程的设置 setDaemon 只能在 start 方法前调用,否则抛 IllegalThreadStateException 异常;
- 与其它线程一样,只不过是在 start 方法前调用 setDaemon (true)。
Thread t = new Thread();
- t.setDaemon(boolean daemon);
- t.isDaemon() : boolean;
# 在线程中处理不受控制的异常
Java 里有 2 种异常:
- 检查异常(Checked exceptions): 这些异常必须强制捕获它们或在一个方法里的 throws 子句中。 例如, IOException 或者 ClassNotFoundException。
- 未检查异常(Unchecked exceptions): 这些异常不用强制捕获它们。例如, NumberFormatException, RuntimeException。
不用担心线程的检查异常,因为 run 不能 throws 异常,所以只能在 run 内部捕获并处理;
但 run 内部出现未检查异常,则需要进行下面处理:
- 实现 Thread.UncaughtExceptionHandler 的 uncaughtException 方法;
- 设置线程的未捕获异常处理器,setUncaughtExceptionHandler。
Thread 对未检查异常处理的顺序是:
- 检查线程是否有设置 handler;
- 如果没设置,则检查线程组有没有设置 handler;
- 如果没设置,则检查 Thread 有没有设置 setDefaultUncaughtExceptionHandler 全局默认 handler;
- 如果没有就将 stack trace 输出到控制台。
// GlobalExceptionHandler.java | |
public class GlobalExceptionHandler implements Thread.UncaughtExceptionHandler { | |
public void uncaughtException(Thread thread, Throwable throwable) { | |
// do something! | |
} | |
} | |
// Test.java | |
public class Test { | |
public static void main(String[] args) { | |
Thread t = new Thread() { | |
@Override | |
public void run() { | |
// do something! | |
} | |
}; | |
// 设置未捕获异常处理器 | |
t.setUncaughtExceptionHandler(new GlobalExceptionHandler()); | |
t.start(); | |
} | |
} |
# 使用本地线程变量
# ThreadLocal
多线程共享同一个对象时,就会出现相互影响。可以使用本地线程变量,每个线程都会储存各自的副本。
ThreadLocal<String> local = new ThreadLocal<String>() { | |
@Override | |
protected String initialValue() { | |
return "initial value"; | |
} | |
}; |
- local.get() : <T>;
- local.set(<T> value);
- local.remove();
如果线程 A 里面创建线程 B,则线程 B 调用 local.get () 得到的是初始值,即没有继承线程 A 的本地线程变量。
# InheritableThreadLocal
使用 InheritableThreadLocal 可以继承本地线程变量,InheritableThreadLocal 是 ThreadLocal 的子类;
InheritableThreadLocal<String> local = new InheritableThreadLocal<String>() { | |
@Override | |
protected String childValue(String s) { | |
return super.childValue(s); // 得到父线程的值,以父线程的值做初始化 | |
} | |
@Override | |
protected String initialValue() { | |
return "initial value"; | |
} | |
}; |
与 ThreadLocal 的唯一区别是:InheritableThreadLocal 多了 childValue 方法。
线程 A 创建子线程 B 时,则会为子线程 B 初始化 childValue 方法,该方法可以设置其它的值。
# 使用线程组
# 创建分组与设置
为了方便管理多个线程,可以为线程分组,统一管理。
根线程组为 system,默认线程组为 main。
- system
- main
- group1(自定义)
- thread1
- thread2
- subGroup1
- group2(自定义)
- group1(自定义)
- main
ThreadGroup group = new ThreadGroup("worker"); | |
ThreadGroup subGroup = new ThreadGroup(group, "subWorker"); | |
Runnable r = new Runnable() { | |
@Override | |
public void run() { | |
// do something! | |
} | |
}; | |
Thread w1 = new Thread(group, "worker1"); | |
Thread w2 = new Thread(group, r); | |
Thread w3 = new Thread(group, r, "worker3"); |
# 获取 Group 的信息与管理
- 获取信息:
- group.activeCount () : int; 返回活动线程的估计数。
- group.activeGroupCount () : int; 返回活动线程组的估计数。
- group.getName () : String; 返回该线程组名称。
- grou. getMaxPriority () : int; 返回该线程组下最大的优先级。
- group.getParent () : ThreadGroup; 返回该线程组的父线程组。
- group.list () : void; 打印线程组信息。
- group.checkAccess () : void; 确定当前运行的线程是否有权修改此线程组。
- 判断信息:
- group.isDaemon () : boolean; 返回该线程组是否是守护线程。
- group.isDestroyed () : boolean; 返回该线程组是否被销毁。
- group.parentOf (ThreadGroup parent) : boolean; 判断指定线程组是否是该线程组的父级。
- 管理线程:
- group.destroy (); 销毁线程。
- group.interrupt (); 中断线程。
- group.setDaemon (boolean b); 设置是否是守护线程组。
- group.setMaxPriority (int max); 设置最大优先级。
- group.uncaughtException (Thread t, Throwable throwable); 设置全局未捕获异常处理器。
# 处理线程组内的不受控制异常
继承 ThreadGroup 并重写 uncaughtException 方法。
public class MyThreadGroup extends ThreadGroup { | |
public MyThreadGroup(String s) { | |
super(s); | |
} | |
@Override | |
public void uncaughtException(Thread thread, Throwable throwable) { | |
super.uncaughtException(thread, throwable); | |
} | |
} |
# 用线程工厂创建线程
实现 ThreadFactory 接口的 newThread 方法。
public class MyThreadFactory implements ThreadFactory { | |
@Override | |
public Thread newThread(Runnable runnable) { | |
// do something! | |
return null; | |
} | |
} |
# 基本线程同步
临界区:访问一个共享资源在同一时间不能超过一个线程执行的代码块。
# 同步方法
使用 synchronized 关键字控制并发线程。
# 根据修饰对象分类
- 修饰代码块
- synchronized(this | object) {}
- synchronized (类.class) {}
- 修饰方法
- 修饰普通方法
- 修饰静态方法
# 根据获取的锁分类
- 获取对象锁
- synchronized(this | object) {}
- 修饰普通方法
- 获取类锁
- synchronized (类.class) {}
- 修饰静态方法(静态方法内的 synchronized 块必须是类锁)
# 补充
- synchronized 关键字不能继承。
- 定义接口方法时,不能使用 synchronized 关键字。
- 构造方法不能使用 synchronized 关键字,但里面的代码块可以。
- 类锁和对象锁是互不干扰的。
- synchronized 消耗资源比较大,而且导致死锁发生,所以不是临界区的就不使用,尽量避免同步。
# 在同步的类里安排独立属性
你也可以在同步的类里面创建对象,使用该对象作为对象锁,而不必使用 this。
public class ShareClass { | |
// 用于对象锁 | |
private final Object object = new Object(); | |
private long val; | |
public ShareClass(long val) { | |
this.val = val; | |
} | |
public void addVal(long val) { | |
//object 锁 | |
synchronized (object) { | |
this.val += val; | |
} | |
} | |
} |
# 在同步代码中使用条件
缓冲区是一个共享的数据结构,我们必须采用同步机制,比如 synchronized 关键字来控制对它的访问。但是我们有更多的限制因素,如果缓冲区是满的,生产者不能存储数据,如果缓冲区是空的,消费者不能取出数据。
对于这些类型的情况,Java 在 Object 对象中提供 wait (),notify (),和 notifyAll () 方法的实现,这三个方法是 Object 里的,所以所有对象都有这三个方法。
在 synchronized 外部调用 wait () 会抛 IllegalMonitorStateException 异常,并且调用 wait () 方法后,JVM 会休眠,并释放 synchronized 锁。使用 notify (),和 notifyAll () 唤醒。
public class ShareClass { | |
private final Object object = new Object(); | |
// 是否已经通知过了 | |
private boolean wasSignaled = false; | |
private long val = 0; | |
public void addVal(long val) { | |
synchronized (object) { | |
//while 防止丢失信号,假唤醒 | |
//wasSignaled 是否已经通知过了,如果通知过了,则不执行 wait (); | |
while (!wasSignaled && /* other condition */) { | |
try { | |
object.wait(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
// do something! | |
} | |
} | |
public void notify() { | |
// 已经通知过了 | |
wasSignaled = true; | |
object.notify(); | |
} | |
} |
使用时,要注意丢失信号和假唤醒
- 丢失信号:使用标记,如果通知过了,就不再执行 wait () 方法。
- 假唤醒:使用 while 来判断条件,而不是 if。
# 使用 Lock 来同步代码块
Java 提供另外的机制用来同步代码块。它比 synchronized 关键字更加强大,灵活和更好的性能。
Lock 接口:
- void lock();
- void unlock();
- boolean tryLock();
- boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException;
- Condition newCondition();
- void lockInterruptibly() throws InterruptedException;
Lock 接口的实现类:
- ReentrantLock
- ReentrantReadWriteLock.ReadLock
- ReentrantReadWriteLock.WriteLock
Lock 接口下的锁是手动释放的,所以建议在 try 块的 finally 下释放,防止异常导致锁没被释放。
# 使用读 / 写锁同步数据访问
- 如果有读操作,则有新的写操作时,写操作的线程就会阻塞;
- 如果有写操作,则有新的读操作或写操作,新的线程就会阻塞。
# 修改 Lock 的公平性
在 ReentrantLock 类和 ReentrantReadWriteLock 类的构造器中,允许一个名为 fair 的 boolean 类型参数,它允许你来控制这些类的行为。默认值为 false,这将启用非公平模式。
公平锁:排队,每个线程都有机会分到 CPU 时间片;
非公平锁:该线程会尝试获取锁,没有获取到就重新在队尾排队。
由于 tryLock () 方法并不会使线程进入睡眠,即使 Lock 接口正在被使用,这个公平属性并不会影响它的功能。
# 在 Lock 中使用多个条件
Lock lock = new Reentrant.....; | |
Condition c = lock.newCondition(); |
c.await() : void throws InterruptedException;
c.await(long time, TimeUnit unit) :boolean throws InterruptedException;
c.awaitUntil(Date date) : boolean throws InterruptedException;
c.awaitNanos(long nanos) : long throws InterruptedException;
c.awaitUninterruptibly() : void;
c.signal() : void;
c.signalAll() : void;
# 线程同步工具
高级的机制来达到多线程的同步:
- Semaphores: 控制访问多个共享资源的计数器。此机制是并发编程的最基本的工具之一,而且大部分编程语言都会提供此机制。
- CountDownLatch: CountDownLatch 类是 Java 语言提供的一个机制,它允许线程等待多个操作的完结。
- CyclicBarrier: CyclicBarrier 类是又一个 java 语言提供的机制,它允许多个线程在同一个点同步。
- Phaser: Phaser 类是又一个 java 语言提供的机制,它控制并发任务分成段落来执行。全部的线程在继续执行下一个段之前必须等到之前的段执行结束。这是 Java 7 API 的一个新特性。
- Exchanger: Exchanger 类也是 java 语言提供的又一个机制,它提供 2 个线程间的数据交换点。
# 控制并发访问资源 (Semaphores)
创建信号量:资源的数量
// 默认非公平锁 | |
Semaphore s = new Semaphore(1); | |
// 公平锁 | |
Semaphore s = new Semaphore(1, true); |
# 获取信号量
一直阻塞直到获取 n 个信号量成功或线程终止。
- s.acquire():void throws InterruptedException;
- s.acquire(int permits):void throws InterruptedException;
一直阻塞直到获取 n 个信号量成功,不会被打断。
- s.acquireUninterruptibly();
- s.acquireUninterruptibly(int permits);
# 释放信号量
释放 n 个信号量。
- s.release();
- s.release(int permits);
# 尝试获取信号量
尝试获取 n 个信号量,如果没有则返回 false,该方法不阻塞,可以用在 if 上。
- s.tryAcquire();
- s.tryAcquire(int permits);
- s.tryAcquire(long timeout, TimeUnit unit);
- s.tryAcquire(int permits, long timeout, TimeUnit unit);
如果获取到,则处理完毕,需要 s.release 释放。
# 其它信息或控制
- s.drainPermits () : int 立即返回剩余的信号量,并置 0;
- s.availablePermits () : int 获取剩余的信号量个数;
- s.getQueueLength () : int 获取排队中等待 acquire 的线程数目;
- s.hasQueuedThreads () : boolean 判断是否还有在排队的线程;
- s.isFair () : boolean : 判断是否是公平锁;
# 等待多个并发事件完成
Java 并发 API 提供这样的类,它允许 1 个或者多个线程一直等待,直到一组操作执行完成。 这个类就是 CountDownLatch 类。它初始一个整数值,此值是线程将要等待的操作数。当某个线程为了想要执行这些操作而等待时, 它要使用 await () 方法。此方法让线程进入休眠直到操作完成。 当某个操作结束,它使用 countDown () 方法来减少 CountDownLatch 类的内部计数器。当计数器到达 0 时,这个类会唤醒全部使用 await () 方法休眠的线程们。
// 等待操作数 | |
CountDownLatch c = new CountDownLatch(10); |
当计时器为 0,全部唤醒后,就失效了,计时器不能增加,所以得新建对象才能重新使用。
# 一个或多个线程等待
- c.await () : void throws InterruptedException 等待直到计数器为 0,才被唤醒并运行;
- c.await (long timeout, Timeout unit) : void throws InterruptedException 等待直到计数器为 0 或超时,才被唤醒并运行;
# 减少计时器
- c.countDown () : 计时器 --,当计时器 == 0 时,会唤醒所有调过 await 的线程;
# 获取计时器的值
- c.getCount () : long 计时器的值;
# 在同一个点同步任务
CyclicBarrier 也叫同步屏障,在 JDK1.5 被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,所以被阻塞的线程才能继续执行。
# 线程 屏障 | |
线程1 -----> | | |
线程2 ---> | 全部到达屏障后,才能通过---> | |
线程3 ----> | |
Runnable r = new Runnable() { | |
@Override | |
public void run() { | |
// do something! | |
} | |
}; | |
# 5个线程 | |
CyclicBarrier c = new CyclicBarrier(5); | |
# 5个线程结束后,执行r线程 | |
CyclicBarrier c = new CyclicBarrier(5, r); |
# 到达同步点后,等待
- c.await () : int throws InterruptedException, BrokenBarrierException 等待,直到所有的线程都调用 await,才唤醒;
- c.await (int timeout, TimeUnit unit) : int throws InterruptedException, BrokenBarrierException 等待,直到所有的线程都调用 await 或超时,才唤醒;
# 重置
与 CountDownLatch 不同的是,CyclicBarrier 可以重置回到原来的状态。
- c.reset() : void;
调用 reset 后,之前调用 await 的线程会抛出 broken 异常。
# 其它信息
- c.getNumberWaiting () : int 返回已经调用 await 方法的线程数;
- c.getParties () : int 返回剩余还没调用 await 方法的线程数;
- c.isBroken () : boolean 当其中一个线程被中止了,则其它线程也会收到 broken 异常,且该 barrier 的状态就变成 broken 状态。
# 运行阶段性并发任务
# 创建
Phaser p = new Phaser(); | |
Phaser p = new Phaser(int parties); | |
Phaser p = new Phaser(Phaser parent); | |
Phaser p = new Phaser(Phaser parent, int parties); |
# 注册
- p.register() : int;
- p.bulkRegister(int parties) : int;