1. 多线程基础概念

1.1. 进程与线程的区别

  • 进程(Process)是操作系统分配资源(如内存、文件、CPU时间等)的基本单位。每个进程是一个独立的运行环境,具有独立的内存空间,可以独立执行代码和管理资源。
  • 线程(Thread)是进程中的最小执行单位。一个进程可以包含多个线程,这些线程共享进程的内存资源(如堆、方法区等),但每个线程都有自己的栈空间用于存储局部变量、方法调用等。

区别:

  • 资源分配:进程是系统资源分配的基本单位,不同进程的资源是独立的;而线程是进程的执行单元,多个线程共享同一进程的资源。
  • 执行单元:线程是进程中实际执行代码的单位,进程内的多个线程可以同时并发执行,提高了程序执行的效率。
  • 开销:进程的创建、切换、销毁开销较大,线程的开销则较小,因此多线程编程能够更高效地利用 CPU。

1.2. 线程的生命周期

Java 中的线程生命周期主要包括以下几个阶段:

  • 创建(New):当线程对象被创建时,它处于“新建”状态。此时线程没有运行,也没有被分配 CPU 时间。
  • 就绪(Runnable):调用 start() 方法后,线程进入就绪状态,表示已经准备好执行,但尚未获得 CPU 调度。
  • 运行(Running):当线程获得 CPU 时间后,进入运行状态,并开始执行 run() 方法中的代码。
  • 阻塞/等待(Blocked/Waiting/Timed Waiting)
    • 阻塞(Blocked):线程等待某个锁释放或者资源准备好。
    • 等待(Waiting):线程主动放弃 CPU,并进入等待状态,直到某个条件(如锁的释放或另一个线程唤醒它)成立。
    • 限时等待(Timed Waiting):线程在指定时间后自动返回就绪状态,比如 sleep()wait(long timeout)
  • 终止(Terminated):当 run() 方法执行完毕或者线程被异常终止时,线程进入终止状态。

1.3. 线程的优先级

Java 中的线程有优先级,使用 Thread.setPriority(int priority) 可以设置优先级,默认优先级为 Thread.NORM_PRIORITY (值为 5)。线程的优先级取值范围在 1 到 10 之间:

  • Thread.MIN_PRIORITY(值为1)表示最低优先级;
  • Thread.MAX_PRIORITY(值为10)表示最高优先级;
  • Thread.NORM_PRIORITY(值为5)表示默认的普通优先级。

线程调度是由操作系统来控制的,线程的优先级会影响操作系统调度线程的顺序。通常情况下,高优先级的线程有更高的机会获得 CPU 资源,但是 Java 的线程优先级只是建议,具体实现依赖于底层操作系统。

1.4. 线程的安全性

在多线程编程中,多个线程共享进程中的资源,这会导致线程安全性问题。常见的线程安全问题包括:

  • 共享资源的访问:多个线程同时访问和修改同一个共享资源时,可能会导致数据不一致。例如,两个线程同时对同一个变量进行自增操作,可能导致丢失更新。
  • 死锁(Deadlock):多个线程在等待彼此持有的资源,导致相互阻塞,程序无法继续执行。
  • 竞争条件(Race Condition):多个线程竞争访问共享资源,而访问的结果取决于线程调度的顺序,从而导致不确定的行为。

解决方案

  • 同步(Synchronized):使用 synchronized 关键字可以确保同一时间只有一个线程能够访问某个资源或执行某段代码,从而避免资源的冲突。例如:
  • 锁(Locks):除了 synchronized 之外,Java 提供了 java.util.concurrent.locks.Lock 接口,允许更灵活的锁机制。常用的实现类有 ReentrantLock,它提供了 tryLock() 方法,可以非阻塞地尝试获取锁。
  • 线程通信:线程之间可以通过 wait()notify() 进行通信,协同完成任务,避免不必要的资源竞争。
  • 原子操作:Java 的 java.util.concurrent.atomic 包提供了一些线程安全的原子操作类(如 AtomicInteger, AtomicLong),这些类可以通过原子操作的方式来保证线程安全。

1.5. 常见的线程问题

  • 线程饥饿(Thread Starvation):由于高优先级线程一直占用 CPU 资源,导致低优先级线程无法获得 CPU 时间,长时间处于等待状态。
  • 线程泄露(Thread Leak):由于某些线程未能正确关闭或终止,造成系统资源的浪费,影响性能。

2. Java中的多线程机制

2.1. 创建线程的方式

在 Java 中创建线程的两种主要方式是:

  1. 继承 Thread
  2. 实现 Runnable 接口

2.1.1. 继承 Thread

继承 Thread 类是最直接的创建线程的方式。每个线程都是 Thread 类的一个对象,通过重写 Thread 类的 run() 方法,线程的任务逻辑可以被定义。创建线程后调用 start() 方法来启动线程,而不是直接调用 run() 方法,否则不会启动新线程。

实现步骤

  1. 定义一个类继承 Thread 类。
  2. 在子类中重写 run() 方法,定义线程执行的任务。
  3. 创建该子类的对象并调用 start() 方法来启动线程。

示例

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程正在执行:" + Thread.currentThread().getName());
    }
}

public class Main {
    public static void main(String[] args) {
        MyThread thread1 = new MyThread();
        thread1.start();  // 启动线程
    }
}

解释

  • run() 方法中的代码就是线程的执行体。
  • start() 方法用于启动线程,并由 JVM 调用线程的 run() 方法。不能直接调用 run(),否则它只会作为普通方法执行,而不是启动新线程。

特点: 

  • 无法继承其他类:由于 Java 不支持多继承,如果类已经继承了 Thread 类,就不能再继承其他类,限制了类的设计灵活性。
  • 耦合度高:任务代码和线程本身强耦合,不利于代码复用和维护。

2.1.2. 实现 Runnable 接口

实现 Runnable 接口是更常见、更灵活的方式。Runnable 接口只有一个方法 run(),该方法被线程执行时自动调用。

实现步骤

  1. 创建一个类实现 Runnable 接口。
  2. 实现 run() 方法。
  3. Runnable 实例传递给 Thread 对象,然后调用 start() 启动线程。

示例

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("线程正在执行:" + Thread.currentThread().getName());
    }
}

public class Main {
    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();
        Thread thread = new Thread(myRunnable);
        thread.start();  // 启动线程
    }
}

特点:

  • 避免类的多继承限制:一个类可以实现多个接口,因此类可以同时继承其他类和实现 Runnable 接口,设计上更加灵活。
  • 任务与线程分离:任务逻辑被封装在 Runnable 中,线程的管理交给 Thread,这样代码更加模块化、复用性更强。
  • 共享资源更加方便:可以轻松地在多个线程之间共享同一个 Runnable 实例,进而共享数据和资源。

继承基础类并实现 Runnable 接口示例

定义一个新类 MyRunnableClass,它继承 BaseClass 并实现 Runnable 接口,同时在 run() 方法中定义线程的任务。

class MyRunnableClass extends BaseClass implements Runnable {
    @Override
    public void run() {
        System.out.println("线程正在执行:" + Thread.currentThread().getName());
        // 可以调用父类的方法
        display();
    }
}

2.1.3. 两种方式的对比

特点继承 Thread实现 Runnable 接口
实现复杂度较为简单,直接继承类和重写方法稍复杂,需要传递给 Thread 对象
可复用性任务与线程耦合,不便复用任务任务与线程分离,任务代码可以复用
多继承限制由于 Java 不支持多继承,类无法继承其他类可以实现多个接口,类可以继承其他类
资源共享性适合不需要共享资源的简单场景适合需要多个线程共享数据的场景
性能性能差别不大性能差别不大

2.2 Thread类的属性与方法

2.2.1. Thread 类的常用属性

  • 线程名称 (name):每个线程都有一个名称,可以在创建时指定,也可以通过默认名称(如 Thread-0, Thread-1)自动生成。
  • 线程优先级 (priority):线程的优先级影响它被调度器选择执行的顺序。优先级范围为 1(最低优先级)到 10(最高优先级),默认为 5。
  • 是否为守护线程 (daemon):守护线程是后台线程,当程序中的所有非守护线程都终止时,守护线程也会随之终止。

守护线程的作用

  • 后台服务任务

    • 守护线程通常被用于执行一些持续运行的后台任务,比如监听、清理、监控等工作。它们在用户线程执行时提供服务,但不干扰用户线程的执行。
    • 一个常见的例子是 Java 的垃圾回收机制,它是由守护线程负责的。
  • 资源清理与管理

    • 守护线程可以用于管理系统资源,比如定时清理缓存,释放不再需要的资源等。
    • 比如可以用守护线程来周期性地检查一些空闲连接,关闭无效的数据库连接等。
  • 定期任务执行

    • 守护线程可用于定期执行一些维护性任务,如日志备份、定期数据同步、定时发送消息等。

2.2.2. Thread 类的常用方法

1. 线程管理方法

start():用于启动线程,进入就绪状态,等待 CPU 调度。调用此方法后,线程的 run() 方法将会被执行。

注意:直接调用 run() 不会启动新线程,只会在当前线程中运行任务。

Thread thread = new Thread(new MyRunnable());
thread.start();  // 启动新线程

run():线程执行的入口方法,定义线程要执行的任务逻辑。通常是通过实现 Runnable 接口或继承 Thread 类来重写此方法。

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程正在运行:" + Thread.currentThread().getName());
    }
}

join() 使当前线程等待另一个线程执行完毕。比如,如果调用 thread.join(),当前线程会暂停执行,直到 thread 线程执行完成。

Thread thread = new Thread(new MyRunnable());
thread.start();
thread.join();  // 等待 thread 线程执行完毕

sleep(long millis):使当前线程暂停执行一段时间(以毫秒为单位)。线程在休眠期间不会占用 CPU 资源。

try {
    Thread.sleep(1000);  // 当前线程暂停 1 秒
} catch (InterruptedException e) {
    e.printStackTrace();
}

interrupt():中断线程。如果线程在执行 sleep()wait() 等阻塞操作时被中断,会抛出 InterruptedException

thread.interrupt();  // 中断线程

isInterrupted():判断线程是否被中断。可以用来检查一个线程的中断状态。

if (thread.isInterrupted()) {
    System.out.println("线程已被中断");
}

setDaemon(boolean on):设置线程是否为守护线程。守护线程在 JVM 中所有非守护线程执行完毕后自动退出。必须在线程启动之前调用此方法。

thread.setDaemon(true);  // 设置为守护线程

isDaemon():判断线程是否为守护线程。

boolean isDaemon = thread.isDaemon();
2. 线程信息方法

getName() / setName(String name):获取或设置线程的名称。

thread.setName("MyThread");  // 设置线程名称
System.out.println(thread.getName());  // 获取线程名称

getId():返回线程的唯一 ID。线程 ID 是由 JVM 自动分配的,并且在一个 JVM 生命周期中是唯一的。

long id = thread.getId();

getPriority() / setPriority(int priority):获取或设置线程的优先级。线程的优先级可以影响线程被调度的频率,但不能保证优先级高的线程一定先执行。

thread.setPriority(Thread.MAX_PRIORITY);  // 设置线程为最高优先级
System.out.println(thread.getPriority());  // 获取线程优先级

getState():获取线程的当前状态。线程状态可以是以下之一:NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED

Thread.State state = thread.getState();
3. 线程相关的静态方法

currentThread():返回当前正在执行的线程对象。

Thread current = Thread.currentThread();
System.out.println("当前线程:" + current.getName());

yield():提示调度器当前线程愿意让出 CPU,其他线程有机会获得执行权。它只是一个提示,调度器可能会忽略它。

Thread.yield();  // 暂时让出 CPU 给其他线程
4. 线程状态控制方法

wait():使当前线程进入等待状态,直到其他线程调用 notify()notifyAll() 方法。

必须在同步代码块或同步方法中调用,否则会抛出 IllegalMonitorStateException

notify()notifyAll()

  • notify():唤醒一个在 wait() 状态下等待的线程。
  • notifyAll():唤醒所有在 wait() 状态下等待的线程。
synchronized (lock) {
    lock.notify();  // 唤醒等待的线程
}

2.3. 同步与锁机制

在多线程环境中,多个线程可能会同时访问同一资源(如共享变量或对象),这会导致数据不一致或出现竞态条件。为了防止这种情况发生,Java 提供了 同步(synchronization)显式锁(Lock) 机制。

2.3.1. synchronized 关键字

synchronized 是 Java 提供的一种内置同步机制,用于解决多线程并发访问共享资源时的数据不一致问题。它可以确保同一时刻只有一个线程能够访问某个共享资源,从而避免线程之间的冲突。

1.synchronized 的两种用法
同步方法

synchronized 可以用于方法级别,声明为同步方法时,表示该方法在同一时刻只能被一个线程访问。它会锁定调用该方法的对象实例(即对象锁)。

示例

public class Counter {
    private int count = 0;

    // synchronized 关键字,确保只有一个线程能同时访问此方法
    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

在这个例子中,increment() 方法和 getCount() 方法是同步的,任何一个线程在调用这些方法时,其他线程都必须等待,直到当前线程执行完毕。这里 synchronized 是锁定当前对象 (this),确保同一时刻只有一个线程可以调用该方法。

同步代码块

有时我们不需要对整个方法进行同步,只需对特定的代码片段进行同步,此时可以使用同步代码块。同步代码块可以更精确地控制哪些部分需要同步,从而提高程序的效率。

示例

public class Counter {
    private int count = 0;

    public void increment() {
        synchronized (this) {  // 只同步这段代码
            count++;
        }
    }

    public int getCount() {
        return count;
    }
}

在这个例子中,只有 count++ 操作是同步的。同步代码块通过 synchronized 关键字手动指定需要加锁的对象(在这里是 this),从而避免了同步整个方法的开销。

2. 锁的对象

synchronized 的工作原理是基于对象锁,每个对象都有一个与之关联的锁(称为监视器锁)。当线程进入同步方法或同步代码块时,必须先获取锁,锁的对象可以是:

  • 实例对象锁:常用于 synchronized(this) 或者同步实例方法,锁定当前对象实例。
  • 类对象锁:常用于静态方法同步,即 synchronized 用在静态方法上时,它会锁定当前类的 Class 对象,确保多个线程不能同时执行该类的静态同步方法。

示例:类对象锁

public class Counter {
    private static int count = 0;

    // 静态同步方法,锁定的是类对象 Counter.class
    public static synchronized void increment() {
        count++;
    }
}
3. synchronized 的特点
  • 可重入锁synchronized 是一种可重入锁。同一线程在持有某个锁时,可以再次进入同一锁定的同步方法或同步代码块,不会被阻塞。
  • 自动释放锁:当线程退出同步代码块或同步方法时,synchronized 会自动释放锁。
4. synchronized 的缺点
  • 性能开销:在高并发场景下,使用 synchronized 进行锁定会导致线程阻塞,影响程序的并发性。
  • 粗粒度锁定synchronized 作用在方法上或代码块中,锁的粒度较大,容易导致线程竞争激烈的情况下性能下降。

2.3.2. 显式锁(Lock 接口)

Java 提供了 java.util.concurrent.locks.Lock 接口以及其实现类(如 ReentrantLock)来提供显式锁机制。这种锁比 synchronized 更灵活,可以人为地手动为某段代码加上锁与释放锁。

1. Lock 接口及其主要方法

Lock 接口定义了用于控制锁的基础操作,常用方法有:

  • lock():获取锁,如果锁不可用,当前线程将等待,直到锁可用。
  • unlock():释放锁,必须确保调用 lock() 后执行 unlock(),通常会在 finally 块中释放锁。
  • tryLock():尝试获取锁,返回 true 表示获取成功,false 表示失败。
  • lockInterruptibly():允许线程在获取锁的过程中响应中断。
2. ReentrantLock 类

ReentrantLockLock 接口的常用实现类,它是一个可重入锁,允许一个线程多次获得同一个锁。相比 synchronized,它提供了更多的锁控制选项。

示例

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private int count = 0;
    private final Lock lock = new ReentrantLock();

    public void increment() {
        lock.lock();  // 显式获取锁
        try {
            count++;
        } finally {
            lock.unlock();  // 确保在最后释放锁
        }
    }

    public int getCount() {
        return count;
    }
}

解释

  • lock.lock() 用于获取锁,进入临界区。
  • try...finally 确保无论临界区代码是否抛出异常,锁都会在 finally 块中被释放,以防止死锁。
3. ReentrantLock 的特点

灵活性:可以通过 tryLock() 非阻塞地尝试获取锁,或者使用 lockInterruptibly() 响应中断。

将示例代码中的lock.lock();替换成    if (lock.tryLock()){ } else{ }

如果在某个任务的代码中使用了lock.lockInterruptibly(); 则在线程启动之后等待锁(等待执行)的过程中可以通过t2.interrupt();(t2是个线程)来打断t2线程获取锁。

公平锁与非公平锁ReentrantLock 可以选择公平模式或非公平模式。公平模式下,线程会按获取锁的顺序排队;非公平模式则允许线程“插队”,更高效但可能导致线程饥饿。

private final Lock lock = new ReentrantLock(true);

通过以上构造函数可以将锁设置成公平模式。

private final Lock lock = new ReentrantLock();

如果用这种无参的构造函数默认是非公平模式。

可重入性:与 synchronized 一样,ReentrantLock 也是可重入的,同一线程可以多次获得相同的锁。

4. ReentrantLock 的缺点
  • 必须手动释放锁:与 synchronized 不同,ReentrantLock 需要手动调用 unlock() 来释放锁,如果忘记调用可能会导致死锁。
  • 更复杂的代码:使用显式锁时,需要显式地控制锁的获取和释放,代码复杂度较高,容易出错。

2.4 线程池的基础概念

线程池 是 Java 中的一种多线程管理机制,用于复用线程资源,避免频繁创建和销毁线程带来的性能开销。通过线程池,可以将多个任务分配给少量的线程处理,从而提高系统的效率和响应速度。

2.4.1 线程池的基本原理

  • 线程复用:线程池中维护了一个固定数量的线程。这些线程可以反复使用来执行多个任务,而不需要每次都重新创建和销毁线程。这样减少了系统资源的消耗,并提升了执行效率。

  • 任务排队:当线程池中的线程都在执行任务时,新的任务会被放入等待队列中,等待空闲线程来处理。这种任务排队机制可以防止系统因为创建过多线程而导致资源耗尽。

  • 线程数量控制:线程池可以限制同时运行的线程数量,避免因过多线程导致系统性能下降。通过合理配置线程池的大小,可以充分利用系统资源。

2.4.2 线程池的优势

  • 减少资源开销:创建和销毁线程需要系统资源,线程池通过复用线程,减少了这些开销。

  • 提高响应速度:任务提交后,线程池可以立即从池中取出线程处理任务,而不需要等待创建线程,提高了任务的响应速度。

  • 便于线程管理:线程池统一管理线程的生命周期,可以轻松控制线程的启动、执行和终止。此外,线程池还能设置超时策略、拒绝策略等,灵活处理各种任务场景。

2.4.3 Java 中的线程池

Java 提供了 Executor 框架来管理线程池,通过 Executors 工具类可以轻松创建不同类型的线程池。常见的线程池类型有:

  • 固定大小线程池newFixedThreadPool(),创建一个固定数量线程的线程池,超出数量的任务会在队列中等待。
  • 缓存线程池newCachedThreadPool(),可以根据需要创建新线程,适合处理大量短期任务。
  • 单线程池newSingleThreadExecutor(),只用一个线程执行任务,确保所有任务按顺序执行。

这些线程池类型可以根据不同的应用场景选择合适的配置,从而优化性能。

2.4.4 线程池的工作流程

  1. 当任务提交到线程池时,线程池会先检查是否有空闲线程。
  2. 如果有空闲线程,则由该线程执行任务。
  3. 如果没有空闲线程,任务会被放入等待队列中,直到有线程空闲下来。
  4. 当任务完成后,线程并不会被销毁,而是回到线程池中等待下一个任务。

3. Spring Boot中的多线程应用

3.1 @Async 注解的使用

在 Spring Boot 中,@Async 注解用于实现异步方法调用,通过该注解,某些任务可以在独立的线程中执行,而不会阻塞主线程或调用方线程。其主要功能是让某些耗时任务或非关键任务在后台异步执行,以提升系统的响应速度和并发处理能力。

独立的线程 意味着该任务不会在主线程(如 main 线程)中执行,而是由 Spring Boot 通过线程池分配一个后台线程来执行该任务。主线程可以继续处理其他逻辑,而异步任务则在后台完成。

当你使用 @Async 注解时,标注的方法会在一个独立的线程中执行,而不会阻塞当前主线程或调用方线程。 

3.1.1. 开启异步支持

为了使用 @Async 注解,必须在 Spring Boot 应用中开启异步支持。通常,通过在配置类或主类上添加 @EnableAsync 注解来启用异步功能。

示例

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

@Configuration
@EnableAsync  // 启用异步方法调用支持
public class AsyncConfig {
    // 可以在此类中定义其他配置
}

解释

  • @EnableAsync 告诉 Spring 容器开启异步方法的处理功能,扫描带有 @Async 注解的方法并将其放入独立的线程中执行。
  • 这个注解可以放在任何 @Configuration 类中,通常是在 Spring Boot 启动类或者一个专门的配置类中使用。

3.1.2. 使用 @Async 注解

当开启异步支持后,你可以在任何需要异步执行的方法上添加 @Async 注解。被 @Async 标注的方法会被放到后台线程池中执行,而不会阻塞调用该方法的线程。

示例代码

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncService {

    @Async  // 异步执行
    public void performAsyncTask() {
        System.out.println("异步任务执行开始:" + Thread.currentThread().getName());
        try {
            Thread.sleep(3000);  // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("异步任务执行结束:" + Thread.currentThread().getName());
    }
}

解释

  • @Async 使 performAsyncTask() 方法异步执行。当该方法被调用时,它会在后台另一个线程中执行,而主线程可以继续执行其他操作。主线程是程序启动时自动创建的线程。它是执行main()方法的线程。

3.1.3. 调用异步方法

当你调用一个带有 @Async 注解的方法时,Spring 会将该方法交给线程池去执行,调用方线程不会被阻塞,它可以继续执行后续的任务。

示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TaskCallerService {

    @Autowired
    private AsyncService asyncService;

    public void callAsyncMethod() {
        System.out.println("主线程执行:" + Thread.currentThread().getName());
        asyncService.performAsyncTask();  // 调用异步方法
        System.out.println("主线程继续执行:" + Thread.currentThread().getName());
    }
}

解释

  • 当asyncService.performAsyncTask()被调用时,该方法会在后台其他线程中执行(如 task-1),主线程 main 不会等待它完成,继续执行自己的逻辑。

3.1.4 @Async 中指定使用自定义线程池

@Async("customThreadPool")
public void performCustomAsyncTask() {
    System.out.println("异步任务执行:" + Thread.currentThread().getName());
}

解释

  • ThreadPoolTaskExecutor 是 Spring 提供的线程池实现,允许你自定义线程池的各种参数。
  • @Async("customThreadPool") 中指定了自定义线程池 customThreadPool 来执行异步任务。

3.1.5. 带返回值的异步方法(返回一个CompletableFuture对象)

有时,你可能需要异步方法返回一个结果。为了处理这种情况,@Async 方法可以返回  CompletableFuture,这样调用方可以获取异步任务的执行结果。

示例代码

@Async
public CompletableFuture<String> performTaskWithResult() {
    try {
        // 模拟耗时任务
        Thread.sleep(3000);
        if (true) {  // 故意抛出异常
            throw new RuntimeException("任务执行过程中发生异常");
        }
        return CompletableFuture.completedFuture("任务成功");
    } catch (InterruptedException e) {
        return CompletableFuture.completedFuture("任务中断");
    } catch (RuntimeException e) {  // 捕获 RuntimeException
        return CompletableFuture.completedFuture("任务执行过程中出现运行时异常");
    }
}
  • throw new RuntimeException(...) 是在显式地抛出一个 RuntimeException 异常。抛出异常之后会从try语句中跳出并去匹配catch语句。
  • RuntimeExceptionInterruptedException 之间没有继承关系,因此 RuntimeException 不会被 catch (InterruptedException) 捕获。
  • 当抛出 RuntimeException 后,程序会跳过 try 块中后续的代码,直接进入对应的 catch 块,如果存在匹配的 catch,否则异常会继续向上传递。

调用带返回值的异步方法

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TaskCallerService {

    @Autowired
    private AsyncService asyncService;

    public void callAsyncMethodWithResult() {
        CompletableFuture<String> futureResult = asyncService.performAsyncTaskWithResult();

        futureResult.thenAccept(result -> {
            System.out.println("异步任务结果:" + result);
        });

        System.out.println("主线程继续执行:" + Thread.currentThread().getName());
    }
}

解释

  • CompletableFuture 允许异步返回结果,result 是异步任务完成后返回的结果。
  • thenAccept() 是一种回调方法,用于在异步任务完成后处理结果。
  • thenAccept(result -> {...}) 表示当 futureResultCompletableFuture 实例)异步任务完成后,传递 resultthenAccept(),执行其中的逻辑。
  • 主线程在继续执行其他任务时,异步任务在后台运行,当任务完成后会通过回调处理结果。

输出结果 

主线程继续执行:main
异步任务结果:任务成功
  • callAsyncMethodWithResult() 方法在主线程中被调用,立即开始执行。
  • asyncService.performAsyncTaskWithResult() 是一个异步方法,因为它是被 @Async 注解标注的,所以会在一个独立的线程中执行。
  • 主线程不会等待异步任务的完成。它会立即执行 System.out.println("主线程继续执行:" + Thread.currentThread().getName());
  • 后台的异步任务继续执行,当任务完成时,CompletableFuturethenAccept() 回调会被触发。
  • thenAccept() 方法会接收异步任务的返回结果,并执行 System.out.println("异步任务结果:" + result);,输出异步任务的结果。

一般不用CompletableFuture.get()方法返回异步执行结果。(会阻塞主线程)

public void callAsyncMethodWithBlocking() throws Exception {
    CompletableFuture<String> futureResult = asyncService.asyncTaskWithResult();

    // 阻塞等待异步任务完成并获取结果
    String result = futureResult.get();

    // 输出结果
    System.out.println("异步结果:" + result);
}
  • get() 会阻塞调用方线程,直到异步任务完成。
  • thenAccept() 的非阻塞方式不同get() 会强制主线程等待结果返回,这种方式适合需要立即处理任务结果的场景。

3.1.6 使用 @AsyncCompletableFuture.supplyAsync()执行异步任务的区别

特性@AsyncCompletableFuture.supplyAsync()
来源Spring 框架Java 标准库
使用场景适用于 Spring 应用适用于任何 Java 应用,包括非 Spring 项目
异步处理方式自动将标注的方法放入后台线程池执行显式提交异步任务,通过 CompletableFuture
线程池管理Spring 自动管理,或通过配置自定义默认 ForkJoinPool,可自定义 Executor
代码简洁性简单,标注方法即可异步执行更灵活,但需要手动提交异步任务
依赖依赖 Spring 框架,需要启用 @EnableAsync无框架依赖,适用于所有 Java 应用
异常处理使用 Spring 异常处理机制使用 CompletableFuture 的异常处理机制
灵活性相对简单,Spring 管理异步和线程池高度灵活,开发者可自定义线程池和处理流程

3.2 定时任务调度:@Scheduled 和 @EnableScheduling

在 Spring Boot 中,定时任务调度是通过 @Scheduled 注解来实现的,结合 @EnableScheduling 注解可以非常方便地在项目中启用和管理定时任务。Spring Boot 提供了多种调度方式,如基于时间间隔、固定延迟和 Cron 表达式的任务调度,适合各种场景下的定时任务需求。

3.2.1. 启用定时任务支持:@EnableScheduling

要在 Spring Boot 中启用定时任务调度,首先需要在应用程序中使用 @EnableScheduling 注解。该注解用于告诉 Spring Boot 启用任务调度器,以便可以扫描并执行使用了 @Scheduled 注解的方法。

示例:在配置类或主类中启用定时任务支持

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling  // 启用定时任务调度
public class SchedulingConfig {
    // 配置类
}

解释

  • @EnableScheduling 注解通常添加到 Spring Boot 启动类或一个专门的配置类上,启用整个应用的定时任务功能。

3.2.2. 使用 @Scheduled 注解定义定时任务

@Scheduled 注解可以标记在任何 Spring 管理的组件(如 @Service@Component 等)的某个方法上,定义该方法为定时任务。Spring Boot 会根据指定的调度策略定期执行该方法。

3.2.3. 定时任务的调度策略

@Scheduled 提供了三种常见的调度策略:固定速率执行固定延迟执行Cron 表达式。每种方式都可以灵活设置任务的执行周期。

1. 固定速率执行(fixedRate
  • fixedRate 参数表示任务以固定的时间间隔执行,不考虑上一次任务的执行时间。也就是说,每隔指定的时间就会触发一次任务执行,可能导致任务重叠执行。

示例

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class FixedRateTaskService {

    @Scheduled(fixedRate = 5000)  // 每隔5秒执行一次
    public void performTask() {
        System.out.println("任务以固定速率执行:" + Thread.currentThread().getName());
    }
}

解释

  • fixedRate = 5000 表示任务每隔 5 秒执行一次,无论上一次任务是否执行完毕。如果任务执行时间超过 5 秒,下一个任务可能会并发执行。
2. 固定延迟执行(fixedDelay
  • fixedDelay 参数表示上一次任务执行完成后,延迟指定时间后再开始执行下一次任务。与 fixedRate 不同,它保证任务执行不会重叠。

示例

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class FixedDelayTaskService {

    @Scheduled(fixedDelay = 5000)  // 上次任务结束后延迟5秒执行
    public void performTask() {
        System.out.println("任务以固定延迟执行:" + Thread.currentThread().getName());
    }
}

解释

  • fixedDelay = 5000 表示任务在上次任务执行完毕后,延迟 5 秒再开始下一次任务。这种方式可以保证任务执行的连续性,任务不会并发执行。
3. Cron 表达式(cron
  • cron 参数用于更灵活地定义任务调度,采用 Cron 表达式来设置复杂的时间条件。Cron 表达式通常由六个或七个字段组成,依次代表秒、分、时、日、月、星期和(可选的年)。

示例

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class CronTaskService {

    @Scheduled(cron = "0 0/1 * * * ?")  // 每分钟的第0秒执行
    public void performTask() {
        System.out.println("任务按Cron表达式执行:" + Thread.currentThread().getName());
    }
}

Cron 表达式格式

  • 秒 分 时 日 月 周 年(可选)

示例解释

  • cron = "0 0/1 * * * ?" 表示每分钟的第 0 秒执行一次任务。Cron 表达式的 ? 表示不指定该字段。
  • 使用 Cron 表达式可以设置复杂的时间调度,如每天某个时间点执行任务,或每周一早上 8 点执行任务。
4. 示例:多种任务调度策略的结合
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class ScheduledTaskService {

    // 固定速率任务,每隔5秒执行一次
    @Scheduled(fixedRate = 5000)
    public void fixedRateTask() {
        System.out.println("固定速率任务执行:" + Thread.currentThread().getName());
    }

    // 固定延迟任务,每次执行完毕后延迟5秒再执行
    @Scheduled(fixedDelay = 5000)
    public void fixedDelayTask() {
        System.out.println("固定延迟任务执行:" + Thread.currentThread().getName());
    }

    // Cron表达式任务,每分钟的0秒执行
    @Scheduled(cron = "0 0/1 * * * ?")
    public void cronTask() {
        System.out.println("Cron表达式任务执行:" + Thread.currentThread().getName());
    }
}
5. 配置多线程定时任务(可选)

默认情况下,定时任务是在单线程中依次执行的。如果任务执行时间较长,可能会阻塞后续任务的执行。可以通过配置线程池来让多个定时任务并发执行。(配置完线程池才具备多个定时任务并发执行的能力,配置完之后当具有多个定时任务会才会并发执行)

3.3 多线程任务的异常处理

在多线程或异步任务的环境中,异常处理是非常重要的,因为异步任务在独立的线程中运行,异常不会直接被捕获到主线程。如果不处理,未捕获的异常会导致任务执行失败,而不会影响主线程的执行。为了解决这一问题,Spring 提供了多种机制来处理异步任务中的异常。 

3.3.1. 返回值异步任务

处理步骤

  1. 调用异步方法:异步任务返回 CompletableFuture,任务在后台线程执行。
  2. 捕获异常:使用 exceptionally() 捕获执行过程中的异常。
  3. 返回默认值:在异常处理中返回默认值,确保任务失败时仍能提供结果。
  4. 处理结果:通过 thenAccept() 处理任务的正常结果或异常返回的默认值。

示例代码

import java.util.concurrent.CompletableFuture;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncService {

    @Async
public CompletableFuture<String> performTaskWithResult() {
    try {
        Thread.sleep(3000);
        if (true) {
            throw new RuntimeException("任务执行过程中发生异常");
        }
        return CompletableFuture.completedFuture("任务成功");
    } catch (InterruptedException e) {
        return CompletableFuture.completedFuture("任务中断");
    }
}


}

当throw new RuntimeException("任务执行过程中发生异常");,因为没有对应类型的catch块对应上 RuntimeException类型的异常处理(图中处理的是InterruptedException类型的异常),这个时候异步任务没有返回completedFuture类型对象,就会调用 exceptionally() 捕获执行过程中的异常(RuntimeException("任务执行过程中发生异常"))。

调用并处理异常的代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TaskCallerService {

    @Autowired
    private AsyncService asyncService;

    public void callAsyncMethodWithExceptionHandling() {
        // 调用异步任务
        CompletableFuture<String> futureResult = asyncService.performTaskWithResult();

        // 捕获异常并提供默认值
        futureResult.exceptionally(ex -> {
            System.out.println("异步任务失败,异常:" + ex.getMessage());
            return "默认值";  // 返回默认值
        }).thenAccept(result -> {
            System.out.println("异步任务结果:" + result);  // 处理返回结果
        });

        System.out.println("主线程继续执行...");
    }
}

解释

  • 正常情况:如果异步任务成功完成且没有抛出异常,exceptionally() 不会被触发,结果将会传递到下一个回调方法(如 thenAccept())。
  • 异常情况:如果异步任务抛出异常,exceptionally() 将会被触发,处理异常并返回一个默认值,该默认值会作为最终结果传递给 thenAccept()
  • 这里判断异步任务是否成功的标准是通过异步方法是否成功返回了completedFuture类的对象来判定的,如果任务失败,异步方法不会返回任何值,只会throw new RuntimeException("任务执行过程中发生异常"),然后throw的这个异常被ex获取到。
  • 为什么需要返回默认值:为了确保后续的 thenAccept()回调函数能够获取到异步方法的返回值并传入reult。如果异步任务发生了异常,且没有处理,整个 CompletableFuture 链可能会中断,导致后续的回调(如 thenAccept())无法执行,最终导致程序逻辑无法按预期进行。通过在 exceptionally() 中返回一个默认值,即使异步任务失败了,程序依然能够继续执行,并传递该默认值给 thenAccept() 处理。


3.3.2. 无返回值异步任务

处理步骤

  1. 定义异常处理器:创建一个类实现 AsyncUncaughtExceptionHandler,处理所有未捕获的异步任务异常。
  2. 配置异常处理器:通过 AsyncConfigurer 接口将自定义的异常处理器配置到 Spring 的异步机制中。
  3. 异步任务执行:使用 @Async 注解定义异步方法,并在调用时触发异步执行。
  4. 捕获和处理异常:当异步任务发生异常时,CustomAsyncExceptionHandler 会捕获异常,并执行日志记录或其他处理逻辑。
1.定义自定义异常处理器
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;

@Component
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        // 处理异步任务中的未捕获异常
        System.out.println("异步任务异常:" + ex.getMessage());
        System.out.println("发生异常的方法:" + method.getName());
        // 记录日志或发送通知
    }
}
  • AsyncUncaughtExceptionHandler 能捕获所有未捕获的异常,只要它们是在带有 @Async 注解的、无返回值void)的方法中抛出的。这包括任何类型的异常(例如 RuntimeExceptionNullPointerException 等),因为 Throwable 是所有异常类型的父类。只有无返回值(void)的异步任务会进入这个处理器。
  • Throwable ex: 这是异步任务中抛出的异常对象,包含了异常的具体信息(例如异常类型、异常消息等)。你可以通过 ex.getMessage() 获取异常消息,或者通过 ex.getClass() 获取异常的类型。

  • Method method: 这是引发异常的异步方法的 Method 对象,表示是哪一个方法抛出的异常。你可以通过 method.getName() 获取抛出异常的方法的名字,通过 method.getDeclaringClass() 获取方法所在的类。

  • Object... params: 这是引发异常的方法的参数列表,表示在执行这个方法时传入的参数。你可以通过 params 来遍历传入的参数并进行日志记录、调试或异常处理。

2.配置异常处理器
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;

@Configuration
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();  // 使用自定义异常处理器
    }
}

在实现 AsyncConfigurer 的配置类中去进行自定义的线程池与自定义的异步任务处理器的配置有个好处,就是在使用这二者的时候,不需要通过 @Autowired 手动注入,Spring 会自动将这些配置应用到异步任务中。

3.无返回值的异步任务示例
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncService {

    @Async
    public void performTaskWithException() {
        System.out.println("异步任务开始...");
        // 故意抛出异常
        if (true) {
            throw new RuntimeException("任务执行过程中发生异常");
        }
        System.out.println("异步任务结束");
    }
}

解释

  1. AsyncUncaughtExceptionHandler:用于捕获无返回值的 @Async 异步任务中的异常。
  2. handleUncaughtException():处理未捕获的异常,提供异常处理逻辑(如记录日志)。
  3. 任务异常:如果 performTaskWithException() 方法抛出异常,AsyncUncaughtExceptionHandler 会捕获并处理该异常。

4. 通过线程池管理(ThreadPoolTaskExecutor)

在并发编程中,线程池 是管理线程资源的核心机制,能够提高系统的性能和资源利用效率。Spring Boot 提供了多种方式来管理和配置线程池,最常见的方式是使用 ThreadPoolTaskExecutor 来创建和管理线程池。通过 Executor 接口,你可以灵活地调度和管理多线程任务的执行,避免频繁创建和销毁线程所带来的资源浪费。

ThreadPoolTaskExecutor 是 Spring 中用于线程池管理的常见实现类,基于 Java 的 ThreadPoolExecutor,并进行了封装,便于 Spring 环境中的使用。它允许我们自定义线程池的参数,以满足不同任务的需求。

4.1 主要配置参数

4.1.1. 核心线程数 (corePoolSize)

核心线程数是线程池中的最小线程数量,即使线程池中没有任务,这些线程也会保持存活。

当有新任务提交时,优先分配给核心线程处理。如果当前任务数量小于核心线程数,任务会被立即执行。

executor.setCorePoolSize(5);  // 核心线程数量设为5
  • 这里线程池中将始终有 5 个核心线程在等待任务,即使线程空闲时也不会销毁。

4.1.2. 最大线程数 (maxPoolSize)

这是线程池能够支持的最大线程数量。当任务量超出核心线程数时,线程池会创建新的线程,直到达到最大线程数为止。

当任务数量很大时,最大线程数允许线程池动态扩展以应对高并发场景。

executor.setMaxPoolSize(10);  // 最大线程数设为10
  • 当任务负载增加时,线程池可以扩展到最多 10 个线程来处理任务。

4.1.3. 队列容量 (queueCapacity)

这是线程池的任务队列长度。当所有核心线程都在工作时,新任务会被放入队列等待执行。如果队列已满,则会根据拒绝策略处理任务。

适用于需要处理大量任务时,任务可以先进入队列排队,而不是立即创建新线程。

executor.setQueueCapacity(25);  // 任务队列容量设为25
  • 如果核心线程都在工作,最多可以有 25 个任务在队列中等待执行。

4.1.4. 线程存活时间 (keepAliveSeconds)

对于超过核心线程数的非核心线程,如果它们在执行完任务后保持空闲超过设定的存活时间,它们将被销毁。这样可以避免长时间的线程资源占用,减少系统开销。

executor.setKeepAliveSeconds(60);  // 非核心线程空闲60秒后销毁
  • 非核心线程会在空闲 60 秒后被销毁,释放系统资源。

4.1.5. 线程名称前缀 (threadNamePrefix)

为线程池中的线程命名,便于调试和监控。给线程名称设置前缀可以帮助你在日志和监控系统中区分不同的线程池。

executor.setThreadNamePrefix("MyExecutor-");  // 线程名前缀
  • 线程名将会是 MyExecutor-1, MyExecutor-2 这样的形式,方便在日志和监控系统中识别。

4.1.6. 拒绝策略 (RejectedExecutionHandler)

当线程池的任务队列已满且最大线程数已经达到上限时,线程池需要一种策略来处理新提交的任务。常见的拒绝策略包括:

AbortPolicy:直接抛出异常,拒绝任务处理。

CallerRunsPolicy:让调用线程(如主线程)处理任务,而不是创建新的线程。

DiscardPolicy:直接丢弃任务,不处理也不抛异常。

DiscardOldestPolicy:丢弃队列中最早的任务,并尝试重新提交新的任务。

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  // 使用CallerRunsPolicy拒绝策略
  • 如果任务队列已满,任务将由调用线程处理(通常是主线程),避免任务丢失。

4.2 ThreadPoolTaskExecutor 使用示例

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class ThreadPoolConfig {

    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);  // 核心线程数
        executor.setMaxPoolSize(10);  // 最大线程数
        executor.setQueueCapacity(25);  // 队列容量
        executor.setKeepAliveSeconds(60);  // 非核心线程空闲时间
        executor.setThreadNamePrefix("MyExecutor-");  // 线程名前缀
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  // 拒绝策略
        executor.initialize();  // 初始化线程池
        return executor;
    }
}
  • 如果在实现 AsyncConfigurer 接口的配置类中去配置这个线程池,在使用时就不需要手动添加@Async("taskExecutor") ,Spring会自动为你配置。
  • 但是在实现 AsyncConfigurer 接口的配置类中去配置这个线程池也有个弊端,就是在所有需要使用多线程的地方都只会使用这一个线程池,无法满足需要在不同地方使用不同线程池的需求。
  • 这个配置定义了一个自定义的线程池 taskExecutor,用于处理异步任务。
  • 当任务提交时,Spring 将使用这个线程池来调度和执行任务。

4.3 任务调度的实际示例

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncTaskService {

    @Async("taskExecutor")  // 指定自定义线程池
public void executeTask(String taskName) {
    System.out.println(Thread.currentThread().getName() + " 开始执行任务:" + taskName);
    
    // 根据任务名称进行不同的业务处理
    switch (taskName) {
        case "taskA":
            performTaskA();
            break;
        case "taskB":
            performTaskB();
            break;
        default:
            System.out.println("未知任务:" + taskName);
    }

    System.out.println(Thread.currentThread().getName() + " 任务完成:" + taskName);
}

// 模拟任务A的业务处理
private void performTaskA() {
    try {
        System.out.println("执行任务A的逻辑...");
        Thread.sleep(2000);  // 模拟任务A的处理时间
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

// 模拟任务B的业务处理
private void performTaskB() {
    try {
        System.out.println("执行任务B的逻辑...");
        Thread.sleep(3000);  // 模拟任务B的处理时间
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}
  • 使用 @Async("taskExecutor") 指定了使用配置好的 taskExecutor 线程池。
  • 每次调用 performTask() 方法,任务会被分配到线程池中的线程执行,而不会阻塞主线程。
  • 这里任务的分配主要是根据传入的线程名称来判断进行哪种操作。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskCaller {

    @Autowired
    private AsyncTaskService asyncTaskService;

    public void executeTasks() {
        asyncTaskService.executeTask("taskA");
        asyncTaskService.executeTask("taskB");
        System.out.println("主线程执行完毕");
    }
}

4.4 Executor接口的实现类ThreadPoolTaskExecutor,Executor接口的子接口ExecutorService 

选择 ThreadPoolTaskExecutor 的场景(专门为Spring框架设计)

  • 与 Spring 集成:如果你正在开发基于 Spring 的应用程序,ThreadPoolTaskExecutor 与 Spring 的集成更加顺畅,尤其是与 @Async 注解结合使用时,非常方便处理异步任务。
  • 自动管理线程池的生命周期ThreadPoolTaskExecutor 是由 Spring 管理的 bean,不需要手动关闭线程池,生命周期管理方便。
  • 灵活配置线程池ThreadPoolTaskExecutor 提供了灵活的配置选项,可以通过配置类设置核心线程数、队列大小、线程前缀等。

选择 ExecutorService 的场景

  • 独立于 Spring:如果你开发的项目不依赖 Spring 框架,或者需要在 Spring 之外使用线程池,那么 ExecutorService 是更好的选择,因为它是 Java 标准库的一部分。
  • 灵活控制线程池生命周期:当你需要手动控制线程池的生命周期(如需要精确关闭或重新启动线程池)时,ExecutorService 提供了更多的控制选项。
  • 需要 invokeAll() 等高级功能ExecutorService 提供了一些更高级的功能,如批量任务提交和管理(invokeAll()invokeAny()),这些功能在 ThreadPoolTaskExecutor 中没有原生支持。

5. 线程安全问题

在并发编程中,线程安全问题是指多个线程在访问或修改共享资源时,由于线程之间的相互干扰,可能导致数据不一致、系统状态异常、或者出现不可预知的结果。线程安全问题主要源于多线程环境下对共享资源的非同步访问。

5.1 为什么会有线程安全问题

线程安全问题通常发生在以下场景:

  • 多个线程同时读写共享资源:当多个线程同时访问并修改同一个变量或对象,而这些操作没有进行适当的同步控制,可能导致数据不一致或不可预知的行为。
  • 非原子操作:某些操作看似简单,但在多线程环境下并不是原子操作。例如,自增操作(count++)实际上是由三个步骤组成:读取、增加、写入,这三个步骤如果没有同步控制,可能在多个线程并发执行时导致意外的结果。

示例:线程不安全的自增操作

public class Counter {
    private int count = 0;

    public void increment() {
        count++;  // 非原子操作,多个线程同时执行时可能出现数据不一致
    }

    public int getCount() {
        return count;
    }
}

在这个例子中,如果多个线程同时调用 increment() 方法,count++ 可能导致数据不一致。因为 count++ 并不是一个原子操作,可能会出现这样的情况:

  • 线程 A:读取了 count 的值为 5
  • 线程 B:也读取了 count 的值为 5
  • 线程 A线程 B 同时自增并写回内存,最终 count 变成了 6,而不是预期的 7

5.2 常见的线程安全问题

线程安全问题主要表现为以下几种情况:

  • 数据不一致:多个线程同时读写共享资源,可能会导致数据的不可预测性和不一致性。
  • 资源竞争:多个线程争抢同一资源,导致资源使用的不可控或性能问题。
  • 共享状态的非原子操作:操作共享数据时,多个线程可能会同时执行并覆盖对方的操作,导致操作结果不符合预期。

为了避免线程安全问题,可以使用不同的同步机制来确保多个线程在访问共享资源时是安全的,即确保同一时刻只有一个线程可以执行对共享资源的操作。

5.3 解决线程安全问题的常见方法

5.3.1. synchronized 关键字

synchronized 关键字可以确保某段代码在同一时间只能被一个线程执行,从而避免多个线程同时操作共享资源导致的数据不一致问题。

示例:

public class SynchronizedCounter {
    private int count = 0;

    // synchronized 修饰方法,确保同一时刻只有一个线程执行该方法
    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}
  • synchronized 修饰的方法increment() 方法被 synchronized 修饰后,多个线程不能同时访问它。这样可以确保同一时刻只有一个线程可以对 count 进行操作,从而避免数据不一致。

synchronized 的特点

  • 阻塞行为:当一个线程获得了同步锁,其他线程必须等待该线程执行完成并释放锁后才能继续执行。
  • 保证可见性synchronized 不仅保证了线程之间的互斥执行,还保证了线程之间对共享变量修改的可见性。
  • 锁的粒度synchronized 可以锁住整个方法(隐式锁定),或者锁住代码块(显式锁定)。

5.3.2. 显式锁 (Lock)

除了 synchronized 之外,Java 提供了 Lock 接口,允许开发者手动控制锁的获取与释放。Locksynchronized 提供了更灵活的控制,允许显式锁定和解锁。

示例:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockCounter {
    private int count = 0;
    private final Lock lock = new ReentrantLock();

    public void increment() {
        lock.lock();  // 手动加锁
        try {
            count++;
        } finally {
            lock.unlock();  // 保证解锁在 finally 中执行,避免死锁
        }
    }

    public int getCount() {
        return count;
    }
}
  • 手动加锁和解锁:在这个例子中,使用了 lock.lock()lock.unlock() 来手动控制对共享资源的访问。在 finally 块中解锁可以确保不管出现任何异常,锁都会被释放,防止死锁。
  • ReentrantLock:这是最常用的 Lock 实现,它是可重入的,表示同一线程可以多次获取锁。

显式锁的特点

  • 灵活性:显式锁提供了更多灵活的功能,如 tryLock(),可以非阻塞地尝试获取锁。
  • 手动释放锁:使用 Lock 时,必须确保在代码的 finally 块中手动释放锁,否则可能导致死锁。

5.3.3. 线程安全的集合

Java 提供了线程安全的集合类,这些类内部已经实现了同步机制,可以确保多线程环境下的安全访问。常见的线程安全集合包括:

  • ConcurrentHashMap:一个线程安全的哈希表,适合在高并发场景下使用,具有比 Hashtable 更高的性能。
  • CopyOnWriteArrayList:一个线程安全的 ArrayList 实现,适合多读少写的场景。

示例:使用线程安全的集合

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentMapExample {
    private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    public void updateValue(String key, int value) {
        map.put(key, value);  // 线程安全
    }

    public int getValue(String key) {
        return map.get(key);
    }
}
  • ConcurrentHashMap 使用了分段锁机制,可以在高并发场景下确保性能和线程安全。
  • 适用于多线程环境:线程安全的集合类已经内置了同步机制,因此可以直接在并发环境中使用,避免了显式加锁。

5.3.4. 原子类(Atomic Classes)

Java 提供了一些 原子类,如 AtomicIntegerAtomicLongAtomicReference 等,它们可以确保对变量的操作是原子性的,避免了线程安全问题。

示例:使用 AtomicInteger

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicCounter {
    private AtomicInteger count = new AtomicInteger(0);

    public void increment() {
        count.incrementAndGet();  // 原子性自增操作
    }

    public int getCount() {
        return count.get();
    }
}
  • AtomicIntegerincrementAndGet() 是一个原子操作,它确保在多个线程并发访问时,自增操作是安全的。
  • 无锁机制:原子类内部使用了无锁的 CAS(Compare-And-Swap)机制,性能比加锁更高。

5.3.5. 线程局部变量 (ThreadLocal)

ThreadLocal 是 Java 提供的一种解决线程安全问题的机制,它为每个线程创建了单独的变量副本。与使用锁不同,ThreadLocal 不需要同步,因为每个线程都有自己独立的变量存储。

ThreadLocal 的特点:

  • 每个线程独立的副本ThreadLocal 保证每个线程都拥有自己独立的变量副本,线程间不会相互干扰。
  • 避免锁竞争:由于每个线程有自己独立的变量,因此不需要使用锁来同步,从而避免了线程竞争和性能开销。
  • 常用于线程范围内的状态存储:比如用户会话信息、数据库连接、事务等,都可以用 ThreadLocal 来存储。

示例:使用 ThreadLocal 维护线程独立的计数器

public class ThreadLocalCounter {
    // 创建一个 ThreadLocal 变量,类型为 Integer,初始值为 0
    private ThreadLocal<Integer> count = ThreadLocal.withInitial(() -> 0);

    // increment() 方法:每个线程调用该方法时,增加它自己的 count 值
    public void increment() {
        // get() 方法用于获取当前线程的 count 副本
        // set() 方法将当前线程的 count 值增加 1
        count.set(count.get() + 1);
    }

    // getCount() 方法:返回当前线程的 count 值
    public int getCount() {
        // 通过 ThreadLocal 的 get() 方法获取当前线程的变量值
        return count.get();
    }
}

详细说明:

  • ThreadLocal<Integer> count = ThreadLocal.withInitial(() -> 0);

    • 这里我们使用 ThreadLocal 来定义一个类型为 Integer 的线程局部变量 countThreadLocal.withInitial() 方法会为每个线程设置一个初始值。在这个例子中,每个线程的 count 初始值都是 0
  • public void increment()

    • 这是一个 increment() 方法,用于增加当前线程独立的 count 值。
    • count.get():每个线程通过 get() 方法从 ThreadLocal 中获取自己独立的 count 值。
    • count.set(count.get() + 1):获取当前线程的 count 值后,将其加 1,然后使用 set() 方法更新当前线程的 count 值。
    • 注意:ThreadLocal 确保每个线程有独立的副本,因此不同线程调用该方法时并不会修改其他线程的 count 值。
  • public int getCount()

    • 这个方法用于返回当前线程的 count 值。
    • count.get():获取当前线程的 count 值并返回。由于 ThreadLocal 的特性,不同线程获取到的 count 值是互相独立的。

示例场景:

假设我们在多个线程中使用该 ThreadLocalCounter,每个线程都会独立地对 count 进行操作,且不会影响其他线程的 count。比如:

public class ThreadLocalDemo {
    public static void main(String[] args) {
        ThreadLocalCounter counter = new ThreadLocalCounter();

        // 创建多个线程,每个线程独立操作自己的 count 值
        Runnable task = () -> {
            for (int i = 0; i < 5; i++) {
                counter.increment();
                System.out.println(Thread.currentThread().getName() + " : " + counter.getCount());
            }
        };

        // 启动三个线程,每个线程独立执行
        Thread t1 = new Thread(task, "Thread 1");
        Thread t2 = new Thread(task, "Thread 2");
        Thread t3 = new Thread(task, "Thread 3");

        t1.start();
        t2.start();
        t3.start();
    }
}

说明:

  • 每个线程都会从 ThreadLocal 获取自己的 count 副本,并在其上执行自增操作。
  • 由于每个线程都有自己独立的 count,所以在输出时,每个线程的 count 值不会与其他线程的值冲突,保证了线程安全。

ThreadLocal 的应用场景:

  • 数据库连接管理:通过 ThreadLocal 维护每个线程独立的数据库连接,避免多个线程争用同一连接。
  • 用户会话信息:在多线程环境中,使用 ThreadLocal 存储每个线程独立的用户会话信息,确保线程独立的用户状态。
  • 事务管理:使用 ThreadLocal 保存每个线程的事务状态,确保线程安全地管理事务。

5.3.6.总结

方法特点适用场景
synchronized保证同一时刻只有一个线程执行共享资源的操作小规模同步操作
显式锁 (Lock)提供更灵活的锁控制,允许显式获取和释放锁需要复杂同步控制的场景
线程安全集合内部已经实现了同步机制,适合在高并发环境中使用高并发访问的集合操作
原子类 (Atomic Classes)通过无锁的 CAS 机制实现原子性操作,适合对单一变量进行并发操作单一变量的高效并发更新
线程局部变量 (ThreadLocal)为每个线程创建独立的变量副本,避免共享资源同步问题线程独立状态的存储和管理

这种方式特别适用于那些每个线程有独立状态而不需要与其他线程共享资源的场景。

5.4 线程安全问题的关键点

  1. 共享资源:当多个线程访问共享资源时,必须确保对资源的访问是同步的,否则可能会引发数据不一致问题。
  2. 同步机制:可以使用 synchronized 关键字、显式锁(如 ReentrantLock)、线程安全的集合类或原子类来确保线程安全。
  3. 性能权衡:虽然加锁可以解决线程安全问题,但过多的锁可能导致性能下降。因此,应该根据具体场景选择适当的同步策略。
  • 死锁与活锁:线程因为相互等待资源而无法继续执行时可能会发生死锁,理解避免死锁的基本策略如锁的顺序控制等。

6.死锁与活锁

在并发编程中,死锁活锁是两种常见的问题,导致多个线程无法继续执行。这两个问题虽然都是由于线程之间的相互等待或资源争夺引起的,但它们的表现和解决方式有所不同。下面我们来详细讲解死锁与活锁的概念、发生的原因以及如何避免它们。


6.1 死锁

6.1.1. 什么是死锁?

死锁是指两个或多个线程在执行过程中,因相互等待对方持有的资源而陷入无限等待的状态,导致所有线程都无法继续执行。

在死锁发生时,线程之间形成了一个循环依赖,每个线程都持有其他线程所需要的资源,同时等待其他线程释放资源,因此所有线程都无法继续执行。

6.1.2. 死锁发生的必要条件

死锁通常发生在以下四个条件同时满足的情况下,这些条件被称为死锁的四个必要条件

  • 互斥:至少有一个资源是不能被多个线程同时访问的,即资源是独占的。
  • 占有并等待:线程已经持有至少一个资源,同时还在请求其他线程持有的资源,并且该资源没有被释放。
  • 不可抢占:线程所持有的资源在未完成任务之前,不能被其他线程强制剥夺,只有当线程主动释放资源时,资源才可以被其他线程使用。
  • 循环等待:存在一个线程集合 {T1, T2, ..., Tn},其中每个线程都在等待下一个线程所持有的资源,形成了一个等待的循环。

6.1.3. 死锁示例

以下是一个简单的死锁示例,展示了两个线程因为互相等待对方持有的资源而陷入死锁状态。

public class DeadlockExample {

    private final Object lock1 = new Object();  // 资源1
    private final Object lock2 = new Object();  // 资源2

    public void method1() {
        synchronized (lock1) {  // 线程1持有lock1
            System.out.println("Thread1: Holding lock1...");

            // 模拟一些工作
            try { Thread.sleep(100); } catch (InterruptedException e) {}

            synchronized (lock2) {  // 尝试获取lock2
                System.out.println("Thread1: Holding lock1 and lock2...");
            }
        }
    }

    public void method2() {
        synchronized (lock2) {  // 线程2持有lock2
            System.out.println("Thread2: Holding lock2...");

            // 模拟一些工作
            try { Thread.sleep(100); } catch (InterruptedException e) {}

            synchronized (lock1) {  // 尝试获取lock1
                System.out.println("Thread2: Holding lock2 and lock1...");
            }
        }
    }

    public static void main(String[] args) {
        DeadlockExample deadlock = new DeadlockExample();

        // 线程1试图执行method1,获取lock1后尝试获取lock2
        new Thread(deadlock::method1).start();

        // 线程2试图执行method2,获取lock2后尝试获取lock1
        new Thread(deadlock::method2).start();
    }
}

解释

  • 线程1调用 method1(),首先获取 lock1,然后等待获取 lock2
  • 线程2调用 method2(),首先获取 lock2,然后等待获取 lock1
  • 由于线程1和线程2相互持有对方需要的锁,形成了一个循环等待,导致两个线程都陷入了死锁。

6.1.4. 如何避免死锁?

为了避免死锁,可以采取以下策略:

1. 固定资源的获取顺序

确保所有线程以相同的顺序获取资源,避免形成循环等待。例如,所有线程总是先获取 lock1,再获取 lock2。这样即使多个线程同时获取资源,也不会导致死锁。

示例:按顺序获取资源

public class DeadlockPreventionExample {

    private final Object lock1 = new Object();
    private final Object lock2 = new Object();

    public void method1() {
        synchronized (lock1) {  // 先获取lock1
            System.out.println("Thread1: Holding lock1...");
            synchronized (lock2) {  // 再获取lock2
                System.out.println("Thread1: Holding lock1 and lock2...");
            }
        }
    }

    public void method2() {
        synchronized (lock1) {  // 也先获取lock1
            System.out.println("Thread2: Holding lock1...");
            synchronized (lock2) {  // 再获取lock2
                System.out.println("Thread2: Holding lock1 and lock2...");
            }
        }
    }
}

解释

  • 无论是线程1还是线程2,都按照相同的顺序获取锁,先获取 lock1,再获取 lock2,从而避免了循环等待,避免了死锁。
2. 尝试获取锁的超时机制

使用 LocktryLock() 方法可以尝试获取锁,并设置超时时间。如果在指定时间内没有获取到锁,则放弃获取。这种方式可以避免线程无限期等待资源。

示例:使用 tryLock() 避免死锁

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;

public class TryLockExample {

    private final Lock lock1 = new ReentrantLock();
    private final Lock lock2 = new ReentrantLock();

    public void method1() {
        try {
            if (lock1.tryLock(50, TimeUnit.MILLISECONDS)) {
                try {
                    System.out.println("Thread1: Holding lock1...");
                    if (lock2.tryLock(50, TimeUnit.MILLISECONDS)) {
                        try {
                            System.out.println("Thread1: Holding lock1 and lock2...");
                        } finally {
                            lock2.unlock();
                        }
                    } else {
                        System.out.println("Thread1: Could not get lock2, releasing lock1");
                    }
                } finally {
                    lock1.unlock();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void method2() {
        try {
            if (lock2.tryLock(50, TimeUnit.MILLISECONDS)) {
                try {
                    System.out.println("Thread2: Holding lock2...");
                    if (lock1.tryLock(50, TimeUnit.MILLISECONDS)) {
                        try {
                            System.out.println("Thread2: Holding lock2 and lock1...");
                        } finally {
                            lock1.unlock();
                        }
                    } else {
                        System.out.println("Thread2: Could not get lock1, releasing lock2");
                    }
                } finally {
                    lock2.unlock();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

解释

  • 通过 tryLock() 方法,线程会尝试在超时时间内获取锁。如果无法获取锁,它将放弃并释放已经持有的锁,从而避免了死锁。

6.2 活锁

6.2.1. 什么是活锁?

活锁是指线程虽然没有阻塞(没有死锁),但因为线程之间相互配合导致线程无法继续执行下去。线程不断地改变状态或释放资源,但却没有完成任务。

与死锁不同,活锁中的线程并没有等待资源被释放,而是由于不断地重新尝试某些操作或由于外部条件的变化,导致线程陷入一个活跃但无法完成的循环状态。

6.2.2. 活锁的发生原因

活锁通常发生在多个线程试图纠正某种问题的情况下。如果多个线程都不断地进行自我纠正操作,但操作并没有取得实际进展,就会导致活锁。活锁的一个常见原因是,线程在发生冲突时都试图让步或退让,结果它们始终让步,没有一个线程能够完成任务。

6.2.3. 活锁的示例

以下是一个简单的活锁示例,其中两个线程互相让步,但却导致它们都无法完成工作:

public class LivelockExample {

    static class Worker {
        private boolean active = false;

        public boolean isActive() {
            return active;
        }

        public void setActive(boolean active) {
            this.active = active;
        }

        public void work(Worker otherWorker) {
            while (active) {
                if (otherWorker.isActive()) {
                    System.out.println("Worker is giving way to the other worker.");
                    try { Thread.sleep(100); } catch (InterruptedException e) {}
                    continue;
                }
                System.out.println("Working...");
                break;
            }
        }
    }

    public static void main(String[] args) {
        Worker worker1 = new Worker();
        Worker worker2 = new Worker();

        worker1.setActive(true);
        worker2.setActive(true);

        // 两个线程不断地让步,导致活锁
        new Thread(() -> worker1.work(worker2)).start();
        new Thread(() -> worker2.work(worker1)).start();
    }
}

解释

  • worker1worker2 都处于活跃状态,并且它们在工作时互相检查对方是否活跃。
  • 如果另一个 Worker 也是活跃的,它们会让出工作机会并等待对方让步,导致两个线程都无法继续工作,形成了活锁。

6.2.4. 如何避免活锁?

为了避免活锁问题,可以采取以下两种常见的策略:

1. 引入随机等待

在发生活锁的场景下,每个线程都会为了避免冲突而让步,导致两个或多个线程在不断地尝试并退让。为了打破这种死循环,可以引入随机等待时间,让线程在让步时不总是立即尝试下一次操作,而是等待一段随机的时间。

引入随机等待可以让不同的线程在不同的时间重新尝试操作,降低冲突的概率,从而打破活锁的循环。

示例:使用随机等待避免活锁

import java.util.Random;

public class LivelockAvoidance {

    static class Worker {
        private boolean active = false;

        public boolean isActive() {
            return active;
        }

        public void setActive(boolean active) {
            this.active = active;
        }

        public void work(Worker otherWorker) {
            Random random = new Random();
            while (active) {
                if (otherWorker.isActive()) {
                    System.out.println(Thread.currentThread().getName() + ": Giving way...");
                    try {
                        // 引入随机等待时间,减少冲突
                        Thread.sleep(random.nextInt(100));  
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }
                System.out.println(Thread.currentThread().getName() + ": Working...");
                break;
            }
        }
    }

    public static void main(String[] args) {
        Worker worker1 = new Worker();
        Worker worker2 = new Worker();

        worker1.setActive(true);
        worker2.setActive(true);

        // 两个线程使用随机等待,减少活锁几率
        new Thread(() -> worker1.work(worker2), "Worker 1").start();
        new Thread(() -> worker2.work(worker1), "Worker 2").start();
    }
}

解释

  • 通过引入 Thread.sleep(random.nextInt(100)),线程在冲突时随机等待一段时间再尝试操作,减少了线程之间让步的同步性,从而降低了活锁发生的几率。
2. 降低线程间的相互依赖

活锁通常是由于线程之间相互依赖的状态变化导致的。因此,设计时可以尝试减少线程之间的相互依赖,或者减少线程之间的频繁交互。降低线程之间的依赖性,可以减少相互等待或让步的场景,从而降低活锁发生的可能性。

6.3 总结

  • 死锁:发生在多个线程相互等待对方持有的资源,导致所有线程都无法继续执行。解决死锁的常见方法包括:固定资源获取顺序、使用 tryLock() 以及避免循环等待。
  • 活锁:虽然线程在不断尝试执行操作,但由于线程之间相互让步或纠正操作,最终没有线程能够完成任务。避免活锁的策略包括:引入随机等待和减少线程之间的相互依赖。

7. Spring Boot 中的异步处理

Spring Boot 提供了多种方式来实现异步任务处理,这对于需要提高应用程序性能、响应性或执行长时间任务时非常有帮助。异步处理主要应用在以下两种场景中:

  1. 异步事件处理:通过事件发布机制(如 Spring 的 ApplicationEvent)实现任务异步执行,从而解耦业务逻辑,提升应用的可维护性。
  2. 异步 HTTP 请求:用于处理需要长时间运行的任务,通过异步的 REST API 请求避免阻塞服务器的主线程,提升系统的并发处理能力。

7.1 异步事件处理

7.1.1. 什么是异步事件处理?

在很多应用中,任务的执行顺序不一定需要严格同步,尤其是那些不影响主流程的任务,可以通过事件驱动的方式解耦主流程。例如,在用户注册完成后,发送欢迎邮件或日志记录等不需要立即返回结果的任务,可以通过异步事件处理完成。

Spring 的事件驱动模型允许在发布事件时,通知多个监听器。通过这种方式,事件发布者与事件处理者(监听器)之间不直接耦合,且可以通过异步方式处理耗时的任务,避免阻塞主线程。

7.1.2. 实现异步事件处理的步骤

步骤概述

  1. 创建一个自定义事件类,继承 ApplicationEvent
  2. 创建一个事件发布者,用于发布事件。
  3. 创建一个事件监听器,处理该事件。
  4. 使用 @Async 实现异步事件处理。

7.1.3. 异步事件处理的示例

1. 自定义事件类

首先,创建一个自定义事件类,继承 ApplicationEvent,并定义事件的内容。

import org.springframework.context.ApplicationEvent;

// 自定义事件类,继承 ApplicationEvent
public class UserRegistrationEvent extends ApplicationEvent {
    private String username;

    public UserRegistrationEvent(Object source, String username) {
        super(source);
        this.username = username;
    }

    public String getUsername() {
        return username;
    }
}
  •  source 参数代表事件的源(通常是触发事件的对象)。在 publishEvent() 方法中,this 被作为 source 传递给了 UserRegistrationEvent 构造函数,表明事件是由当前对象触发的。
  • UserRegistrationEvent 继承自 ApplicationEventsuper(source) 会调用父类 ApplicationEvent 的构造方法,将 source 赋值给 ApplicationEvent 中的 source 属性。
  • ApplicationEvent 会将这个事件源(this)存储起来,以便在后续处理事件时,监听器能够知道事件是由哪个对象发布的。
2. 事件发布者

创建一个发布者类,负责在用户注册成功后发布 UserRegistrationEvent 事件。

示例一:通过依赖注入的方式直接注入 ApplicationEventPublisher 
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;

@Component
public class UserRegistrationPublisher {

    @Autowired
    private ApplicationEventPublisher eventPublisher;  // 通过 @Autowired 注入

    public void publishEvent(String username) {
        UserRegistrationEvent event = new UserRegistrationEvent(this, username);
        eventPublisher.publishEvent(event);  // 发布事件
    }
}
  • eventPublisher.publishEvent(event):这里的 eventPublisherApplicationEventPublisher,它是 Spring 用来发布事件的工具。调用 publishEvent() 方法会将事件发布到 Spring 容器中,所有监听该事件的监听器都会收到通知并执行相应的处理方法。 
示例二:实现 ApplicationEventPublisherAware接口类来注入ApplicationEventPublisher。

这种方式就必须要重写setApplicationEventPublisher方法

如果你不重写这个方法,eventPublisher 就无法被注入,你将无法发布事件。

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;

@Component
public class UserRegistrationPublisher implements ApplicationEventPublisherAware {
    private ApplicationEventPublisher eventPublisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
        this.eventPublisher = publisher;
    }

    public void publishEvent(String username) {
        // 发布自定义事件
        UserRegistrationEvent event = new UserRegistrationEvent(this, username);
        eventPublisher.publishEvent(event);
    }
}
  • this 指的是调用 publishEvent() 方法的对象本身,即 事件发布者。 
  • 实现 ApplicationEventPublisherAware 接口的为什么能将 ApplicationEventPublisher 注入到事件发布类中,用于发布事件:这源于 Spring 的 IoC(控制反转)机制和 Aware 接口的设计。当你实现某个 Aware 接口时,Spring 容器知道你需要一些特定的 Spring 组件(如 ApplicationEventPublisher),并在初始化时通过调用 setApplicationEventPublisher() 等方法为你注入这些组件。
  • 要重写setApplicationEventPublisher方法:当实现 ApplicationEventPublisherAware 接口时,Spring 会在启动时自动调用 setApplicationEventPublisher() 方法,并将 ApplicationEventPublisher 实例传递给你。这使得你的类可以在运行时发布事件。
3. 事件监听器

创建一个监听器类,负责处理 UserRegistrationEvent 事件,并使用 @Async 注解实现异步处理。

import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class UserRegistrationListener {

    // 使用 @Async 处理事件的监听
    @Async
    @EventListener
    public void handleUserRegistrationEvent(UserRegistrationEvent event) {
        System.out.println("异步处理用户注册事件,发送欢迎邮件给:" + event.getUsername());
        // 模拟耗时任务
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("欢迎邮件发送完成");
    }
}
  • 监听器方法只要被正确标注了 @EventListener,并且事件被发布, 监听器方法 handleUserRegistrationEvent(UserRegistrationEvent event) 的参数类型就是要监听的事件类型(这里的事件类型是自己定义的事件类)。Spring 会自动将发布的事件传递到监听器方法中作为参数。
4. 启用异步支持

为了让 @Async 生效,你需要在应用的配置类中启用异步处理支持。

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

@Configuration
@EnableAsync
public class AsyncConfig {
}
5. 测试异步事件
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {

    @Autowired
    private UserRegistrationPublisher publisher;

    @PostMapping("/register")
    public String registerUser(@RequestParam String username) {
        // 模拟用户注册
        System.out.println("用户注册成功:" + username);

        // 发布用户注册事件
        publisher.publishEvent(username);

        return "用户注册成功";
    }
}
6. 运行结果

当用户通过 /register 接口注册成功后,UserRegistrationPublisher 会发布用户注册事件,UserRegistrationListener 接收到事件后,异步执行耗时任务(如发送欢迎邮件)。主线程不会被耗时任务阻塞。

用户注册成功:test_user
异步处理用户注册事件,发送欢迎邮件给:test_user
欢迎邮件发送完成

7.2 异步 HTTP 请求

7.2.1. 什么是异步 HTTP 请求?

在一些场景中,服务器需要处理耗时较长的任务(如数据处理、文件上传、外部 API 调用等)。如果直接使用同步方式,服务器主线程将被阻塞,直到任务完成,这会导致服务器响应变慢,并发处理能力下降。

通过 Spring Boot 的异步 HTTP 请求支持,可以将长时间运行的任务交给异步处理线程,释放主线程,提升服务器的响应能力。

7.2.2. 实现异步 HTTP 请求的步骤

步骤概述

  1. 在控制器方法中返回 CompletableFuture,用于异步返回结果。
  2. 在后台任务中执行长时间任务,控制器不需要等待任务完成即可返回响应。
  3. 启用异步支持。

7.2.3. 异步 HTTP 请求的示例

1. 异步控制器

在控制器中,通过返回 CompletableFuture<T> 使 HTTP 请求异步执行。CompletableFuture 是 Java 提供的异步任务执行类,可以异步获取执行结果。

import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.CompletableFuture;

@RestController
public class AsyncController {

    @Async  // 只需要使用 @Async 让整个方法异步执行
    @GetMapping("/process")
    public CompletableFuture<String> process() {
        System.out.println("主线程处理:" + Thread.currentThread().getName());

        try {
            Thread.sleep(5000);  // 模拟耗时任务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("异步线程处理:" + Thread.currentThread().getName());
        return CompletableFuture.completedFuture("处理完成");
    }
}
2. 启用异步支持

与异步事件处理一样,@Async 需要通过 @EnableAsync 来启用异步支持。

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

@Configuration
@EnableAsync
public class AsyncConfig {
}
3. 测试异步 HTTP 请求

访问 /process,你会发现主线程立即返回,而后台线程在异步处理长时间任务。

主线程处理:http-nio-8080-exec-1
(5秒后)
异步线程处理:ForkJoinPool.commonPool-worker-1

解释

  • 主线程在处理完请求后立即返回,服务器可以继续处理其他请求,而后台线程异步执行耗时任务,等任务完成后返回结果给客户端。

7.3 异步处理的优势

  • 提升性能:异步处理将长时间的任务交给后台线程处理,不会阻塞主线程,从而提升系统的并发处理能力。
  • 解耦任务执行:异步事件处理可以解耦业务逻辑中的事件触发与任务执行,增强系统的可维护性和灵活性。
  • 更快响应:通过异步 HTTP 请求,服务器能够更快地响应客户端请求,同时在后台处理耗时任务。

8. 使用 Spring Boot Actuator 进行监控

Spring Boot Actuator 是一个非常强大的工具,允许你轻松地监控和管理 Spring Boot 应用程序的运行状况。Actuator 提供了多种监控端点,能够监控线程池、JVM、内存、CPU 使用率、数据库连接池等资源的状态。

在多线程调优与监控中,Actuator 可以帮助开发者实时获取线程池的状态、任务队列的情况、线程的活跃度等信息。这为系统调优提供了实时数据支持,便于发现问题并进行优化。

8.1. 设置 Spring Boot Actuator

首先,我们需要引入 Actuator 依赖,并在配置文件中启用相关的监控端点。

8.1.1 引入 Actuator 依赖

pom.xml 中添加 Spring Boot Actuator 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

8.1.2 配置 Actuator 端点

application.properties 中配置 Actuator 端点。你可以选择只暴露某些监控端点,或者将所有端点暴露。

# 暴露所有端点
management.endpoints.web.exposure.include=*

常用的 Actuator 监控端点:

  • /actuator/health:检查应用程序的健康状态。
  • /actuator/metrics:查看 JVM 和其他资源的指标数据。
  • /actuator/threads:监控线程的状态和数量。
  • /actuator/metrics/jvm.threads.live:获取 JVM 当前活跃线程数。
  • /actuator/metrics/jvm.threads.daemon:获取守护线程的数量。

8.2. 使用 Actuator 监控线程池

Spring Boot Actuator 默认提供了对 JVM 线程的监控,但如果你想监控自定义的线程池(如通过 ThreadPoolTaskExecutor 配置的线程池),你可以通过 Actuator 的自定义监控功能,将线程池的状态暴露出来。

8.2.1 使用 Actuator 监控 JVM 线程

Actuator 默认提供了对 JVM 线程的一些基本监控信息,你可以通过 /actuator/metrics 端点获取这些信息。

查看 JVM 活跃线程数

访问 http://localhost:8080/actuator/metrics/jvm.threads.live 可以查看当前 JVM 中活跃线程的数量,返回内容如下:

{
  "name": "jvm.threads.live",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 15
    }
  ],
  "availableTags": []
}
查看守护线程数

访问 http://localhost:8080/actuator/metrics/jvm.threads.daemon 可以查看 JVM 中守护线程的数量,返回内容如下:

{
  "name": "jvm.threads.daemon",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 10
    }
  ],
  "availableTags": []
}
查看线程状态

访问 http://localhost:8080/actuator/metrics/jvm.threads.states 可以查看不同线程状态(如 RUNNABLE、BLOCKED 等)的统计信息,返回内容如下:

{
  "name": "jvm.threads.states",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 8,
      "tag": "state",
      "tagValue": "RUNNABLE"
    },
    {
      "statistic": "VALUE",
      "value": 3,
      "tag": "state",
      "tagValue": "TIMED_WAITING"
    }
  ]
}

8.2.2 监控自定义线程池

虽然 Spring Boot Actuator 默认监控 JVM 线程,但如果你想监控自定义的线程池(如通过 ThreadPoolTaskExecutor 配置的线程池),你需要创建自定义的监控指标并将其暴露给 Actuator。

以下是如何监控自定义线程池状态的步骤:

1. 配置自定义线程池

首先,你需要创建一个自定义的 ThreadPoolTaskExecutor

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class ThreadPoolConfig {

    @Bean(name = "taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);   // 核心线程数
        executor.setMaxPoolSize(10);   // 最大线程数
        executor.setQueueCapacity(25); // 队列容量
        executor.setKeepAliveSeconds(60); // 线程空闲时间
        executor.setThreadNamePrefix("MyExecutor-");
        executor.initialize();
        return executor;
    }
}
2. 自定义监控指标

通过将自定义线程池的一些状态暴露为 Actuator 指标,我们可以对其进行监控。可以使用 MeterRegistry 注册自定义的指标。

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class ThreadPoolMonitor {

    @Autowired
    public ThreadPoolMonitor(MeterRegistry meterRegistry, ThreadPoolTaskExecutor taskExecutor) {
        Gauge.builder("thread.pool.size", taskExecutor, ThreadPoolTaskExecutor::getPoolSize)
                .description("The current size of the thread pool")
                .register(meterRegistry);

        Gauge.builder("thread.pool.active", taskExecutor, ThreadPoolTaskExecutor::getActiveCount)
                .description("The number of active threads in the thread pool")
                .register(meterRegistry);

        Gauge.builder("thread.pool.queue.size", taskExecutor, 
                executor -> executor.getThreadPoolExecutor().getQueue().size())
                .description("The number of tasks in the thread pool queue")
                .register(meterRegistry);
    }
}
3. 查看自定义指标

一旦注册了自定义指标,你可以通过 Actuator 的 /actuator/metrics 端点来查看这些自定义的线程池监控指标。例如:

  • 查看线程池的大小

    访问 http://localhost:8080/actuator/metrics/thread.pool.size

    {
      "name": "thread.pool.size",
      "measurements": [
        {
          "statistic": "VALUE",
          "value": 5
        }
      ],
      "availableTags": []
    }
    
  • 查看活跃线程数

    访问 http://localhost:8080/actuator/metrics/thread.pool.active

    {
      "name": "thread.pool.active",
      "measurements": [
        {
          "statistic": "VALUE",
          "value": 3
        }
      ],
      "availableTags": []
    }
    
  • 查看任务队列大小

    访问 http://localhost:8080/actuator/metrics/thread.pool.queue.size

    {
      "name": "thread.pool.queue.size",
      "measurements": [
        {
          "statistic": "VALUE",
          "value": 7
        }
      ],
      "availableTags": []
    }
    

通过这些自定义指标,你可以实时监控线程池的运行状态,调整线程池的配置,确保系统在高负载情况下仍能保持稳定运行。

8.3. 其他 Actuator 监控功能

除了线程池,Spring Boot Actuator 还提供了对应用程序其他方面的监控:

  • 内存监控:可以通过 /actuator/metrics/jvm.memory.used/actuator/metrics/jvm.memory.max 来监控 JVM 内存使用情况。
  • CPU 使用率:通过 /actuator/metrics/system.cpu.usage 来查看 CPU 使用率。
  • 数据库连接池监控:如果使用了数据库连接池,可以通过 /actuator/metrics 查看连接池的活跃连接数和最大连接数。

8.4. 总结

  • Spring Boot Actuator 是监控和管理 Spring Boot 应用程序的强大工具。它允许开发者通过简单的配置和自定义指标,实时监控线程池、JVM、内存等系统资源的运行状态。
  • 通过 Actuator 的 /metrics 端点,开发者可以轻松获取线程池的活跃线程数、队列大小、线程池大小等信息,帮助发现潜在的性能瓶颈并进行调优。
  • 除了线程池,Actuator 还支持对内存、CPU、数据库连接池等系统资源的监控,使得应用程序的运行状况更加透明。

9. 消息队列与异步处理

9.1. 什么是消息队列?

在分布式系统中,消息队列(如 RabbitMQ、Kafka、ActiveMQ 等)是一种非常重要的中间件,它可以用于实现系统间的异步通信。消息队列允许系统的不同组件以松耦合的方式进行数据传递。发送者(生产者)将消息发送到消息队列,消费者从消息队列中异步地消费消息进行处理。

消息队列的主要功能

  • 异步任务处理:消息队列允许生产者在发送消息后立即返回,而不需要等待消费者处理完成,避免了阻塞主线程。这提高了系统的并发处理能力,特别是在处理大量任务时。

  • 系统解耦:消息队列通过异步传递任务,将生产者和消费者之间的直接依赖关系消除。生产者只需将任务发送到消息队列,而消费者从队列中取出任务并处理,使得系统各组件可以独立扩展和维护。

  • 削峰填谷:在高并发场景下,消息队列充当缓冲器,生产者可以迅速将任务推送到队列中,而消费者可以以可控的速度逐步处理任务,从而平衡负载,防止系统过载。

  • 容错性:如果消费者暂时不可用,消息仍然可以保存在队列中,等待消费者恢复后继续处理,确保任务不会丢失。这种机制提高了系统的可靠性和容错能力。

9.2 通过 RabbitMQ 实现异步任务处理

典型流程

  1. 生产者发送消息:生产者(如订单服务)将任务信息发送到 RabbitMQ 交换器,RabbitMQ 再根据路由规则将消息分发到指定的队列。
  2. 消息持久化:RabbitMQ 可以将消息持久化到磁盘,确保在系统重启或崩溃时,消息不会丢失。
  3. 消费者异步消费:多个消费者(如邮件服务、支付服务)可以从队列中异步获取消息,并进行处理。这些消费者可以在不同的节点上运行,互不影响。

9.2.1 RabbitMQ 的引入

要在 Spring Boot 项目中使用 RabbitMQ,首先需要在项目的 pom.xml 中添加 RabbitMQ 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

spring-boot-starter-amqp 提供了 Spring 对 RabbitMQ 的支持,包含了处理消息队列的必要依赖和工具类。

9.2.2 配置 RabbitMQ

接下来,我们需要在 Spring Boot 应用中配置 RabbitMQ 的连接信息。可以通过在 application.properties中配置 RabbitMQ 服务器的连接信息。

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

这些配置告诉 Spring Boot 如何连接到 RabbitMQ 服务器。在实际项目中,你需要根据你的 RabbitMQ 环境替换为对应的服务器地址和认证信息。

9.2.3 创建 RabbitMQ 的 Exchange、Queue 和 Binding

在 RabbitMQ 中,消息从生产者发送到交换器(Exchange),交换器根据路由规则将消息发送到队列(Queue)。消费者从队列中获取消息进行处理。

在 Spring Boot 中,我们可以通过配置类创建这些组件:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 定义队列
    @Bean
    public Queue queue() {
        return new Queue("task_queue", true); // true 表示持久化队列
    }

    // 定义交换器
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("task_exchange");
    }

    // 将队列和交换器通过路由键绑定
    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("task.routing.key");
    }
}

解释

  • 队列(Queue):创建一个名为 task_queue 的队列,并设置为持久化(防止 RabbitMQ 重启时消息丢失)。
  • 交换器(Exchange):创建一个名为 task_exchange 的交换器,类型为 Topic,它允许根据路由键有选择地发送消息到不同的队列。
  • 绑定(Binding):通过 task.routing.key 将交换器和队列进行绑定,表示消息通过这个路由键发送到指定队列。

9.2.4 生产者发送消息

生产者(Producer)是发送消息的服务。生产者可以通过 RabbitTemplate 向 RabbitMQ 发送消息。我们可以通过注入 RabbitTemplate 来发送消息:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TaskProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendTask(String taskMessage) {
        rabbitTemplate.convertAndSend("task_exchange", "task.routing.key", taskMessage);
        System.out.println("Sent task: " + taskMessage);
    }
}

解释

  • RabbitTemplate 是 Spring 提供的操作 RabbitMQ 的模板类,用于发送和接收消息。
  • convertAndSend 方法将消息发送到 task_exchange 交换器,使用 task.routing.key 路由键发送消息到绑定的队列。

9.2.5 消费者异步处理消息

消费者(Consumer)负责从队列中接收消息并异步处理。在 Spring Boot 中,我们可以使用 @RabbitListener 注解来监听队列中的消息,并异步处理这些消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class TaskConsumer {

    @RabbitListener(queues = "task_queue")
    public void receiveTask(String taskMessage) {
        System.out.println("Received task: " + taskMessage);
        // 模拟任务处理
        try {
            Thread.sleep(2000); // 模拟耗时任务
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("Task processed: " + taskMessage);
    }
}

解释

  • @RabbitListener(queues = "task_queue"):通过注解声明消费者监听 task_queue 队列。当有消息进入队列时,消费者会自动接收并处理这些消息。
  • receiveTask 方法中,我们可以处理任务,并模拟异步任务的执行,比如一个耗时的处理操作。

9.2.6 通过异步任务提高处理性能

如果有大量任务需要处理,我们可以通过多线程来并发处理消息,以提高任务处理的速度和系统的吞吐量。我们可以通过配置一个线程池来管理多个消费者线程:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class TaskExecutorConfig {

    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);  // 核心线程数
        executor.setMaxPoolSize(10);  // 最大线程数
        executor.setQueueCapacity(100); // 队列容量
        executor.setThreadNamePrefix("TaskExecutor-");
        executor.initialize();
        return executor;
    }
}

然后可以在 TaskConsumer 中使用这个线程池来并发处理多个任务:

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class TaskConsumer {

    @Async("taskExecutor")  // 使用自定义线程池
    @RabbitListener(queues = "task_queue")
    public void receiveTask(String taskMessage) {
        System.out.println("Received task: " + taskMessage + " - Thread: " + Thread.currentThread().getName());
        // 模拟任务处理
        try {
            Thread.sleep(2000); // 模拟耗时任务
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("Task processed: " + taskMessage);
    }
}

解释

  • @Async("taskExecutor"):指定使用我们配置的 taskExecutor 线程池,允许多个任务并行处理,提升任务处理的效率。
  • 通过线程池管理任务执行,能够确保系统在高并发情况下保持稳定,避免单线程处理的瓶颈。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部