死锁
2025/11/4大约 22 分钟
死锁 (Deadlock)
1. 死锁的概念
死锁是指在多线程或多进程环境中,两个或多个进程(线程)因为互相等待对方持有的资源而陷入无限等待的状态。当发生死锁时,这些进程(线程)都无法继续执行,整个系统可能因此无法正常工作。
死锁是并发系统中常见的问题,特别是在资源竞争激烈的环境下更容易发生。它是操作系统理论中的经典问题,也是并发编程中需要特别注意避免的情况。
2. 死锁的产生条件
死锁的产生必须同时满足以下四个必要条件,这四个条件被称为Coffman条件:
2.1 互斥条件(Mutual Exclusion)
- 资源不能被共享,只能由一个进程(线程)使用
- 例如:打印机、独占文件等资源
2.2 请求与保持条件(Hold and Wait)
- 进程(线程)已经保持了至少一个资源,但又提出了新的资源请求
- 而新请求的资源已被其他进程(线程)占有
- 此时请求进程(线程)会阻塞,但又不释放自己已占有的资源
2.3 不可剥夺条件(No Preemption)
- 进程(线程)已获得的资源,在未使用完之前,不能被强制剥夺
- 只能由获得该资源的进程(线程)主动释放
2.4 循环等待条件(Circular Wait)
- 存在一个进程(线程)链,使得每个进程(线程)都在等待下一个进程(线程)所持有的资源
- 例如:进程A等待进程B的资源,进程B等待进程C的资源,进程C等待进程A的资源
3. 死锁的危害
3.1 系统资源浪费
- 死锁进程占用的资源无法被释放,导致系统资源利用率下降
- 随着死锁进程数量增加,可用资源越来越少
3.2 系统吞吐量下降
- 死锁进程无法继续执行,导致系统处理能力降低
- 可能影响其他正常进程的执行
3.3 系统不稳定
- 严重情况下,可能导致整个系统无法正常工作
- 可能需要重启系统才能解决
3.4 数据不一致
- 在某些情况下,死锁可能导致数据处于不一致状态
- 特别是涉及数据库事务的情况
4. 死锁的处理策略
操作系统中处理死锁的策略主要分为四大类:
4.1 死锁预防(Deadlock Prevention)
- 通过破坏死锁的四个必要条件之一来防止死锁发生
4.2 死锁避免(Deadlock Avoidance)
- 在资源分配过程中,通过某种算法避免系统进入不安全状态
4.3 死锁检测与恢复(Deadlock Detection and Recovery)
- 允许系统进入死锁状态,然后通过检测机制发现死锁,并采取措施恢复
4.4 死锁忽略(Deadlock Ignorance)
- 某些系统(如大多数通用操作系统)采用的策略
- 简单地忽略死锁问题,依赖用户或管理员来处理
5. 死锁预防方法
5.1 破坏互斥条件
- 允许多个进程同时访问某些资源
- 但并不是所有资源都可以共享(如打印机等独占设备)
- 实际应用中,这一方法可行性较低
5.2 破坏请求与保持条件
- 一次性请求所有资源:进程在开始执行前,必须一次性请求并获得所有所需资源
- 资源预分配策略:适用于资源需求可以预先确定的情况
- 缺点:资源利用率低,可能导致饥饿(进程因无法一次性获取所有资源而长期等待)
// 资源预分配示例
public class ResourcePreallocation {
private Resource[] resources; // 所有可能需要的资源
private boolean[] allocated; // 资源分配状态
public synchronized boolean requestAllResources(int[] neededResources) {
// 检查是否所有资源都可用
for (int i = 0; i < neededResources.length; i++) {
if (neededResources[i] > resources[i].availableUnits) {
return false; // 无法一次性分配所有资源
}
}
// 一次性分配所有资源
for (int i = 0; i < neededResources.length; i++) {
resources[i].availableUnits -= neededResources[i];
allocated[i] = true;
}
return true;
}
public synchronized void releaseAllResources(int[] allocatedResources) {
// 释放所有资源
for (int i = 0; i < allocatedResources.length; i++) {
if (allocated[i]) {
resources[i].availableUnits += allocatedResources[i];
allocated[i] = false;
}
}
}
}5.3 破坏不可剥夺条件
- 资源剥夺:当一个进程请求新的资源而无法立即获得时,系统可以剥夺该进程已获得的某些资源
- 回退机制:进程回退到某个安全状态,释放已获得的资源
- 缺点:实现复杂,可能导致计算回退,影响系统性能
// 资源剥夺示例
public class ResourcePreemption {
private Map<Process, List<Resource>> processResourcesMap;
private PriorityQueue<Process> processQueue;
public synchronized boolean requestResource(Process process, Resource resource, int units) {
if (resource.availableUnits >= units) {
// 可以直接分配
allocateResource(process, resource, units);
return true;
} else {
// 尝试从低优先级进程剥夺资源
for (Process p : processResourcesMap.keySet()) {
if (p != process && p.getPriority() < process.getPriority()) {
List<Resource> resources = processResourcesMap.get(p);
for (Resource r : resources) {
if (r == resource && r.getUnitsHeldBy(p) >= units) {
// 执行剥夺
deallocateResource(p, resource, units);
allocateResource(process, resource, units);
return true;
}
}
}
}
return false; // 无法获得资源
}
}
private void allocateResource(Process process, Resource resource, int units) {
resource.allocateTo(process, units);
if (!processResourcesMap.containsKey(process)) {
processResourcesMap.put(process, new ArrayList<>());
}
processResourcesMap.get(process).add(resource);
}
private void deallocateResource(Process process, Resource resource, int units) {
resource.deallocateFrom(process, units);
// 更新映射
}
}5.4 破坏循环等待条件
- 资源有序分配:给所有资源类型分配一个全局唯一的编号
- 进程必须按照编号递增的顺序请求资源
- 示例:资源A编号为1,资源B编号为2,进程必须先请求资源A,再请求资源B
// 资源有序分配示例
public class OrderedResourceAllocation {
private Map<String, Resource> resources; // 按名称存储资源
// 资源按名称字母顺序排序(模拟编号递增)
private List<String> orderedResourceNames;
public synchronized boolean requestResourcesInOrder(Process process, List<String> requestedResourceNames) {
// 检查请求的资源名称是否按递增顺序排列
List<String> sortedNames = new ArrayList<>(requestedResourceNames);
Collections.sort(sortedNames);
if (!sortedNames.equals(requestedResourceNames)) {
System.out.println("错误:资源请求必须按递增顺序");
return false;
}
// 依次请求资源
for (String resourceName : requestedResourceNames) {
Resource resource = resources.get(resourceName);
if (resource == null || !resource.isAvailable()) {
// 如果任何一个资源不可用,释放已分配的资源
releaseResources(process, requestedResourceNames.subList(0, requestedResourceNames.indexOf(resourceName)));
return false;
}
resource.allocateTo(process);
}
return true;
}
private void releaseResources(Process process, List<String> resourceNames) {
for (String resourceName : resourceNames) {
Resource resource = resources.get(resourceName);
if (resource != null && resource.isHeldBy(process)) {
resource.release();
}
}
}
}6. 死锁避免方法
6.1 安全状态
- 安全状态:存在一个资源分配序列,使得所有进程都能顺利完成
- 不安全状态:不存在这样的分配序列,有可能发生死锁
- 死锁避免的核心:确保系统始终处于安全状态
6.2 银行家算法(Banker's Algorithm)
6.3 资源分配图算法
- 使用有向图表示进程和资源之间的关系
- 进程节点:用圆圈表示
- 资源节点:用方框表示,方框中的黑点表示资源实例
- 请求边:进程指向资源,表示进程请求资源
- 分配边:资源指向进程,表示资源已分配给进程
死锁定理
- 如果资源分配图中没有环路,则系统没有死锁
- 如果资源分配图中有环路,并且每个资源类型只有一个实例,则系统一定存在死锁
- 如果资源分配图中有环路,但存在某个资源类型有多个实例,则系统可能存在死锁
// 资源分配图简化实现
public class ResourceAllocationGraph {
private Set<ProcessNode> processes; // 进程集合
private Set<ResourceNode> resources; // 资源集合
private Map<ProcessNode, List<ResourceNode>> requestEdges; // 请求边
private Map<ResourceNode, List<ProcessNode>> allocationEdges; // 分配边
public ResourceAllocationGraph() {
processes = new HashSet<>();
resources = new HashSet<>();
requestEdges = new HashMap<>();
allocationEdges = new HashMap<>();
}
// 添加进程
public void addProcess(ProcessNode process) {
processes.add(process);
requestEdges.put(process, new ArrayList<>());
}
// 添加资源
public void addResource(ResourceNode resource) {
resources.add(resource);
allocationEdges.put(resource, new ArrayList<>());
}
// 添加请求边
public void addRequestEdge(ProcessNode process, ResourceNode resource) {
if (requestEdges.containsKey(process)) {
requestEdges.get(process).add(resource);
}
}
// 添加分配边
public void addAllocationEdge(ResourceNode resource, ProcessNode process) {
if (allocationEdges.containsKey(resource)) {
allocationEdges.get(resource).add(process);
// 实际分配资源
resource.allocateTo(process);
}
}
// 移除请求边
public void removeRequestEdge(ProcessNode process, ResourceNode resource) {
if (requestEdges.containsKey(process)) {
requestEdges.get(process).remove(resource);
}
}
// 移除分配边
public void removeAllocationEdge(ResourceNode resource, ProcessNode process) {
if (allocationEdges.containsKey(resource)) {
allocationEdges.get(resource).remove(process);
// 释放资源
resource.releaseFrom(process);
}
}
// 检测是否存在死锁(简化版,假设每种资源只有一个实例)
public boolean detectDeadlock() {
// 构建有向图的邻接表表示
Map<Object, List<Object>> graph = new HashMap<>();
// 添加所有节点
for (ProcessNode p : processes) {
graph.put(p, new ArrayList<>());
}
for (ResourceNode r : resources) {
graph.put(r, new ArrayList<>());
}
// 添加所有边
// 进程 -> 资源(请求边)
for (ProcessNode p : requestEdges.keySet()) {
for (ResourceNode r : requestEdges.get(p)) {
graph.get(p).add(r);
}
}
// 资源 -> 进程(分配边)
for (ResourceNode r : allocationEdges.keySet()) {
for (ProcessNode p : allocationEdges.get(r)) {
graph.get(r).add(p);
}
}
// 使用DFS检测环
Set<Object> visited = new HashSet<>();
Set<Object> recStack = new HashSet<>();
for (Object node : graph.keySet()) {
if (detectCycleUtil(node, visited, recStack, graph)) {
return true; // 存在环,即存在死锁
}
}
return false;
}
// DFS检测环的辅助方法
private boolean detectCycleUtil(Object node, Set<Object> visited,
Set<Object> recStack, Map<Object, List<Object>> graph) {
if (!visited.contains(node)) {
visited.add(node);
recStack.add(node);
for (Object neighbor : graph.get(node)) {
if (!visited.contains(neighbor) && detectCycleUtil(neighbor, visited, recStack, graph)) {
return true;
} else if (recStack.contains(neighbor)) {
return true;
}
}
}
recStack.remove(node);
return false;
}
// 进程节点类
static class ProcessNode {
private String name;
public ProcessNode(String name) {
this.name = name;
}
@Override
public String toString() {
return "Process[" + name + "]";
}
}
// 资源节点类
static class ResourceNode {
private String name;
private int instances;
private Set<ProcessNode> holders;
public ResourceNode(String name, int instances) {
this.name = name;
this.instances = instances;
this.holders = new HashSet<>();
}
public void allocateTo(ProcessNode process) {
if (holders.size() < instances) {
holders.add(process);
}
}
public void releaseFrom(ProcessNode process) {
holders.remove(process);
}
@Override
public String toString() {
return "Resource[" + name + ", instances=" + instances + "]";
}
}
// 示例
public static void main(String[] args) {
ResourceAllocationGraph graph = new ResourceAllocationGraph();
// 创建进程和资源
ProcessNode p1 = new ProcessNode("P1");
ProcessNode p2 = new ProcessNode("P2");
ResourceNode r1 = new ResourceNode("R1", 1);
ResourceNode r2 = new ResourceNode("R2", 1);
graph.addProcess(p1);
graph.addProcess(p2);
graph.addResource(r1);
graph.addResource(r2);
// 创建可能导致死锁的边
// P1持有R1,请求R2
graph.addAllocationEdge(r1, p1);
graph.addRequestEdge(p1, r2);
// P2持有R2,请求R1
graph.addAllocationEdge(r2, p2);
graph.addRequestEdge(p2, r1);
// 检测死锁
System.out.println("是否存在死锁: " + graph.detectDeadlock());
}
}7. 死锁检测与恢复
7.1 死锁检测算法
- 定期检查系统状态,判断是否存在死锁
- 检测时机:可以周期性检测,或在资源请求失败时检测
- 检测算法:类似于银行家算法,但不预先判断安全性,而是检查是否存在无法继续执行的进程
// 简化的死锁检测算法
public class DeadlockDetection {
private int[][] allocation; // 当前分配矩阵
private int[][] request; // 请求矩阵
private boolean[] finish; // 进程是否可完成
private int[] work; // 工作向量(可用资源)
private int numProcesses; // 进程数量
private int numResources; // 资源种类数量
public DeadlockDetection(int[][] allocation, int[][] request, int[] available) {
this.allocation = allocation;
this.request = request;
this.work = Arrays.copyOf(available, available.length);
this.numProcesses = allocation.length;
this.numResources = available.length;
this.finish = new boolean[numProcesses];
}
// 检测死锁
public List<Integer> detectDeadlocks() {
// 初始化finish数组
Arrays.fill(finish, false);
// 复制可用资源
int[] tempWork = Arrays.copyOf(work, numResources);
// 找出不需要任何资源的进程
for (int i = 0; i < numProcesses; i++) {
boolean allZero = true;
for (int j = 0; j < numResources; j++) {
if (allocation[i][j] != 0) {
allZero = false;
break;
}
}
if (allZero) {
finish[i] = true;
}
}
// 尝试找到可以执行完成的进程
boolean changed;
do {
changed = false;
for (int i = 0; i < numProcesses; i++) {
if (!finish[i]) {
boolean canExecute = true;
// 检查进程的请求是否可以被满足
for (int j = 0; j < numResources; j++) {
if (request[i][j] > tempWork[j]) {
canExecute = false;
break;
}
}
if (canExecute) {
// 进程可以执行完成,释放其资源
for (int j = 0; j < numResources; j++) {
tempWork[j] += allocation[i][j];
}
finish[i] = true;
changed = true;
}
}
}
} while (changed);
// 收集未完成的进程(死锁进程)
List<Integer> deadlockedProcesses = new ArrayList<>();
for (int i = 0; i < numProcesses; i++) {
if (!finish[i]) {
deadlockedProcesses.add(i);
}
}
return deadlockedProcesses;
}
// 示例
public static void main(String[] args) {
// 示例数据 - 两个进程相互等待资源
int[][] allocation = {
{1, 0}, // P0持有资源0
{0, 1} // P1持有资源1
};
int[][] request = {
{0, 1}, // P0请求资源1
{1, 0} // P1请求资源0
};
int[] available = {0, 0}; // 无可用资源
DeadlockDetection detector = new DeadlockDetection(allocation, request, available);
List<Integer> deadlocks = detector.detectDeadlocks();
if (!deadlocks.isEmpty()) {
System.out.println("检测到死锁!死锁进程: " + deadlocks);
} else {
System.out.println("未检测到死锁");
}
}
}7.2 死锁恢复方法
7.2.1 进程终止
- 终止所有死锁进程:简单但代价高
- 逐个终止进程:直到死锁消除,需要选择合适的终止顺序
- 选择终止顺序的原则:
- 进程优先级
- 进程执行时间
- 进程已使用的资源
- 进程完成还需要的资源
7.2.2 资源剥夺
- 从非死锁进程中剥夺资源,分配给死锁进程
- 选择原则:
- 最少剥夺原则
- 最小影响原则
- 优先级原则
7.2.3 进程回退
- 将进程回退到某个安全检查点
- 释放已获得的资源
- 重新启动进程
- 需要系统支持检查点机制
// 死锁恢复示例
public class DeadlockRecovery {
private List<Process> processes;
private List<Resource> resources;
private DeadlockDetection detector;
public DeadlockRecovery(List<Process> processes, List<Resource> resources) {
this.processes = processes;
this.resources = resources;
}
// 检测并恢复死锁
public void detectAndRecover() {
// 构建死锁检测所需的数据结构
int numProcesses = processes.size();
int numResources = resources.size();
int[][] allocation = new int[numProcesses][numResources];
int[][] request = new int[numProcesses][numResources];
int[] available = new int[numResources];
// 填充数据
for (int i = 0; i < numProcesses; i++) {
Process p = processes.get(i);
for (int j = 0; j < numResources; j++) {
allocation[i][j] = p.getAllocatedResources().get(j);
request[i][j] = p.getRequestedResources().get(j);
}
}
for (int j = 0; j < numResources; j++) {
available[j] = resources.get(j).getAvailableCount();
}
detector = new DeadlockDetection(allocation, request, available);
List<Integer> deadlockedProcesses = detector.detectDeadlocks();
if (!deadlockedProcesses.isEmpty()) {
System.out.println("检测到死锁!");
// 尝试通过终止进程恢复
recoverByTermination(deadlockedProcesses);
}
}
// 通过终止进程恢复死锁
private void recoverByTermination(List<Integer> deadlockedProcesses) {
// 按照优先级选择要终止的进程
deadlockedProcesses.sort((p1, p2) -> processes.get(p1).getPriority() - processes.get(p2).getPriority());
for (int pid : deadlockedProcesses) {
Process process = processes.get(pid);
System.out.println("终止进程: " + process.getName() + " (优先级: " + process.getPriority() + ")");
// 释放该进程持有的所有资源
releaseProcessResources(process);
// 从系统中移除该进程
processes.remove(pid);
// 重新检测死锁是否已消除
detectAndRecover();
break; // 一次只终止一个进程,然后重新检测
}
}
// 通过资源剥夺恢复死锁
private void recoverByPreemption(List<Integer> deadlockedProcesses) {
// 简化版:从非死锁进程中剥夺资源给死锁进程
List<Integer> nonDeadlocked = new ArrayList<>();
for (int i = 0; i < processes.size(); i++) {
if (!deadlockedProcesses.contains(i)) {
nonDeadlocked.add(i);
}
}
if (!nonDeadlocked.isEmpty()) {
// 选择一个非死锁进程进行剥夺
int victimIndex = nonDeadlocked.get(0);
Process victim = processes.get(victimIndex);
// 剥夺其一半资源
System.out.println("从进程 " + victim.getName() + " 剥夺资源");
for (int i = 0; i < resources.size(); i++) {
int allocated = victim.getAllocatedResources().get(i);
if (allocated > 0) {
int toPreempt = allocated / 2;
victim.getAllocatedResources().set(i, allocated - toPreempt);
resources.get(i).addAvailableCount(toPreempt);
}
}
// 重新检测死锁是否已消除
detectAndRecover();
}
}
// 释放进程资源
private void releaseProcessResources(Process process) {
for (int i = 0; i < resources.size(); i++) {
int allocated = process.getAllocatedResources().get(i);
if (allocated > 0) {
resources.get(i).addAvailableCount(allocated);
process.getAllocatedResources().set(i, 0);
}
}
}
// 进程类
static class Process {
private String name;
private int priority;
private List<Integer> allocatedResources;
private List<Integer> requestedResources;
public Process(String name, int priority, int numResources) {
this.name = name;
this.priority = priority;
this.allocatedResources = new ArrayList<>(Collections.nCopies(numResources, 0));
this.requestedResources = new ArrayList<>(Collections.nCopies(numResources, 0));
}
// getter方法
public String getName() { return name; }
public int getPriority() { return priority; }
public List<Integer> getAllocatedResources() { return allocatedResources; }
public List<Integer> getRequestedResources() { return requestedResources; }
}
// 资源类
static class Resource {
private String name;
private int availableCount;
public Resource(String name, int totalCount) {
this.name = name;
this.availableCount = totalCount;
}
public void addAvailableCount(int count) {
this.availableCount += count;
}
public int getAvailableCount() {
return availableCount;
}
}
// 示例
public static void main(String[] args) {
// 创建资源
List<Resource> resources = new ArrayList<>();
resources.add(new Resource("R1", 1));
resources.add(new Resource("R2", 1));
// 创建进程
List<Process> processes = new ArrayList<>();
processes.add(new Process("P0", 1, 2));
processes.add(new Process("P1", 2, 2));
// 制造死锁
processes.get(0).getAllocatedResources().set(0, 1); // P0持有R1
processes.get(0).getRequestedResources().set(1, 1); // P0请求R2
processes.get(1).getAllocatedResources().set(1, 1); // P1持有R2
processes.get(1).getRequestedResources().set(0, 1); // P1请求R1
// 减少可用资源计数
resources.get(0).addAvailableCount(-1);
resources.get(1).addAvailableCount(-1);
// 检测并恢复
DeadlockRecovery recovery = new DeadlockRecovery(processes, resources);
recovery.detectAndRecover();
}
}8. 实际应用中的死锁避免策略
8.1 超时机制
- 在尝试获取锁时设置超时时间
- 如果在指定时间内无法获取锁,则放弃请求,释放已获得的资源,稍后重试
- Java中的
Lock.tryLock(timeout, unit)方法支持超时获取锁
// 使用超时机制避免死锁
public class TimeoutLockExample {
private Lock lock1 = new ReentrantLock();
private Lock lock2 = new ReentrantLock();
public void method1() {
boolean gotLock1 = false;
boolean gotLock2 = false;
try {
// 尝试获取第一个锁,设置超时时间
gotLock1 = lock1.tryLock(100, TimeUnit.MILLISECONDS);
if (gotLock1) {
System.out.println(Thread.currentThread().getName() + " 获取了锁1");
// 故意添加延迟,增加死锁可能性,用于演示
Thread.sleep(50);
// 尝试获取第二个锁,设置超时时间
gotLock2 = lock2.tryLock(100, TimeUnit.MILLISECONDS);
if (gotLock2) {
System.out.println(Thread.currentThread().getName() + " 获取了锁2");
// 执行临界区操作
System.out.println(Thread.currentThread().getName() + " 执行临界区操作");
} else {
System.out.println(Thread.currentThread().getName() + " 无法在超时时间内获取锁2");
}
} else {
System.out.println(Thread.currentThread().getName() + " 无法在超时时间内获取锁1");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放已获取的锁
if (gotLock2) {
lock2.unlock();
System.out.println(Thread.currentThread().getName() + " 释放了锁2");
}
if (gotLock1) {
lock1.unlock();
System.out.println(Thread.currentThread().getName() + " 释放了锁1");
}
}
}
public void method2() {
// 类似method1,但先尝试获取锁2
boolean gotLock2 = false;
boolean gotLock1 = false;
try {
gotLock2 = lock2.tryLock(100, TimeUnit.MILLISECONDS);
if (gotLock2) {
System.out.println(Thread.currentThread().getName() + " 获取了锁2");
Thread.sleep(50);
gotLock1 = lock1.tryLock(100, TimeUnit.MILLISECONDS);
if (gotLock1) {
System.out.println(Thread.currentThread().getName() + " 获取了锁1");
System.out.println(Thread.currentThread().getName() + " 执行临界区操作");
} else {
System.out.println(Thread.currentThread().getName() + " 无法在超时时间内获取锁1");
}
} else {
System.out.println(Thread.currentThread().getName() + " 无法在超时时间内获取锁2");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (gotLock1) {
lock1.unlock();
System.out.println(Thread.currentThread().getName() + " 释放了锁1");
}
if (gotLock2) {
lock2.unlock();
System.out.println(Thread.currentThread().getName() + " 释放了锁2");
}
}
}
public static void main(String[] args) {
TimeoutLockExample example = new TimeoutLockExample();
// 创建两个线程,分别执行method1和method2
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
example.method1();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "线程1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
example.method2();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "线程2");
t1.start();
t2.start();
}
}8.2 顺序锁获取
- 所有线程按照相同的顺序获取锁
- 消除循环等待条件
- 是预防死锁的有效方法,实现简单
// 使用顺序锁获取避免死锁
public class OrderedLockExample {
private Lock lock1 = new ReentrantLock();
private Lock lock2 = new ReentrantLock();
// 定义锁的获取顺序:先获取lock1,再获取lock2
public void methodWithOrderedLocks() {
// 按照固定顺序获取锁
lock1.lock();
try {
System.out.println(Thread.currentThread().getName() + " 获取了锁1");
// 模拟业务操作
Thread.sleep(50);
lock2.lock();
try {
System.out.println(Thread.currentThread().getName() + " 获取了锁2");
// 执行临界区操作
System.out.println(Thread.currentThread().getName() + " 执行临界区操作");
} finally {
lock2.unlock();
System.out.println(Thread.currentThread().getName() + " 释放了锁2");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock1.unlock();
System.out.println(Thread.currentThread().getName() + " 释放了锁1");
}
}
// 即使是不同的方法,也要遵循相同的锁获取顺序
public void anotherMethodWithOrderedLocks() {
// 仍然按照相同的顺序获取锁:先lock1,再lock2
lock1.lock();
try {
System.out.println(Thread.currentThread().getName() + " 在另一个方法中获取了锁1");
// 模拟业务操作
Thread.sleep(30);
lock2.lock();
try {
System.out.println(Thread.currentThread().getName() + " 在另一个方法中获取了锁2");
// 执行另一个临界区操作
System.out.println(Thread.currentThread().getName() + " 执行另一个临界区操作");
} finally {
lock2.unlock();
System.out.println(Thread.currentThread().getName() + " 在另一个方法中释放了锁2");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock1.unlock();
System.out.println(Thread.currentThread().getName() + " 在另一个方法中释放了锁1");
}
}
// 使用锁排序工具类的更通用方法
public void methodWithLockUtils(Object resource1, Object resource2) {
LockPair locks = LockUtils.getLockPair(resource1, resource2);
locks.first.lock();
try {
locks.second.lock();
try {
// 执行需要两个资源的操作
System.out.println(Thread.currentThread().getName() + " 使用两个资源执行操作");
} finally {
locks.second.unlock();
}
} finally {
locks.first.unlock();
}
}
// 锁排序工具类
static class LockUtils {
// 根据对象的哈希码确定锁的获取顺序
public static LockPair getLockPair(Object resource1, Object resource2) {
int hash1 = System.identityHashCode(resource1);
int hash2 = System.identityHashCode(resource2);
if (hash1 < hash2) {
return new LockPair((Lock)resource1, (Lock)resource2);
} else {
return new LockPair((Lock)resource2, (Lock)resource1);
}
}
static class LockPair {
public final Lock first;
public final Lock second;
public LockPair(Lock first, Lock second) {
this.first = first;
this.second = second;
}
}
}
public static void main(String[] args) {
OrderedLockExample example = new OrderedLockExample();
// 创建多个线程执行相同或不同的方法
for (int i = 0; i < 3; i++) {
Thread t1 = new Thread(() -> {
for (int j = 0; j < 3; j++) {
example.methodWithOrderedLocks();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "线程-A-" + i);
Thread t2 = new Thread(() -> {
for (int j = 0; j < 3; j++) {
example.anotherMethodWithOrderedLocks();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "线程-B-" + i);
t1.start();
t2.start();
}
}
}8.3 锁的层次结构
- 为锁定义层次结构,确保不会出现跨层次的锁获取
- 适用于大型系统,便于管理
8.4 使用无锁数据结构
- 使用并发集合类(如
ConcurrentHashMap、CopyOnWriteArrayList等) - 这些数据结构内部实现了线程安全,无需显式加锁
// 使用无锁数据结构避免死锁
public class LockFreeDataStructuresExample {
// 使用ConcurrentHashMap - 线程安全的HashMap实现
private ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
// 使用CopyOnWriteArrayList - 读多写少场景的线程安全List
private CopyOnWriteArrayList<String> copyOnWriteList = new CopyOnWriteArrayList<>();
// 使用AtomicInteger - 原子操作类
private AtomicInteger counter = new AtomicInteger(0);
// 使用ConcurrentLinkedQueue - 无界线程安全队列
private ConcurrentLinkedQueue<String> concurrentQueue = new ConcurrentLinkedQueue<>();
// 演示ConcurrentHashMap的使用
public void demonstrateConcurrentMap() {
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
String key = "key-" + Thread.currentThread().getName().hashCode() % 10;
// 原子地更新值
concurrentMap.compute(key, (k, v) -> (v == null) ? 1 : v + 1);
// 获取当前值
Integer value = concurrentMap.get(key);
if (i % 100 == 0) {
System.out.println(Thread.currentThread().getName() + ": " + key + " = " + value);
}
}
};
// 创建多个线程同时操作map
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(task, "MapThread-" + i);
threads[i].start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("最终ConcurrentHashMap内容: " + concurrentMap);
}
// 演示CopyOnWriteArrayList的使用
public void demonstrateCopyOnWriteList() {
// 写操作线程
Runnable writer = () -> {
for (int i = 0; i < 10; i++) {
String element = Thread.currentThread().getName() + "-element-" + i;
copyOnWriteList.add(element);
System.out.println(Thread.currentThread().getName() + " 添加: " + element);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 读操作线程
Runnable reader = () -> {
for (int i = 0; i < 20; i++) {
System.out.println(Thread.currentThread().getName() + " 读取列表大小: " + copyOnWriteList.size());
// 安全地遍历列表,即使列表在遍历过程中被修改
for (String element : copyOnWriteList) {
System.out.println(Thread.currentThread().getName() + " 读取: " + element);
break; // 只打印第一个元素,避免输出过多
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 创建写线程和读线程
Thread writerThread = new Thread(writer, "Writer");
Thread readerThread1 = new Thread(reader, "Reader1");
Thread readerThread2 = new Thread(reader, "Reader2");
writerThread.start();
readerThread1.start();
readerThread2.start();
// 等待线程完成
try {
writerThread.join();
readerThread1.join();
readerThread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("最终列表内容: " + copyOnWriteList);
}
// 演示原子操作类的使用
public void demonstrateAtomicOperations() {
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
// 原子地递增计数器
int value = counter.incrementAndGet();
// 原子地比较并设置
boolean updated;
do {
int current = counter.get();
updated = counter.compareAndSet(current, current + 1);
} while (!updated);
if (i % 200 == 0) {
System.out.println(Thread.currentThread().getName() + ": 计数器值 = " + counter.get());
}
}
};
// 创建多个线程同时操作计数器
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(task, "AtomicThread-" + i);
threads[i].start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("最终计数器值: " + counter.get());
// 期望值应该是 5 * 1000 * 2 = 10000
}
public static void main(String[] args) {
LockFreeDataStructuresExample example = new LockFreeDataStructuresExample();
System.out.println("===== 演示ConcurrentHashMap =====");
example.demonstrateConcurrentMap();
System.out.println("\n===== 演示CopyOnWriteArrayList =====");
example.demonstrateCopyOnWriteList();
System.out.println("\n===== 演示原子操作类 =====");
example.demonstrateAtomicOperations();
}
}8.5 死锁检测工具
- 使用专业的死锁检测工具监控系统
- Java中的JConsole、VisualVM等工具可以检测线程死锁
- 在Linux系统中,可以使用
ps、jstack等命令分析
9. 死锁案例分析
9.1 经典生产者-消费者死锁
// 生产者-消费者死锁示例
public class ProducerConsumerDeadlock {
private List<String> buffer = new ArrayList<>();
private final int MAX_BUFFER_SIZE = 5;
private Object lock1 = new Object();
private Object lock2 = new Object();
// 生产者 - 错误实现,可能导致死锁
public void produce(String item) throws InterruptedException {
synchronized (lock1) { // 先获取lock1
while (buffer.size() == MAX_BUFFER_SIZE) {
System.out.println("缓冲区已满,生产者等待...");
lock1.wait(); // 在lock1上等待
}
// 错误:在持有lock1的情况下尝试获取lock2
synchronized (lock2) { // 嵌套锁获取
buffer.add(item);
System.out.println("生产者添加: " + item + ", 缓冲区大小: " + buffer.size());
lock2.notifyAll(); // 在lock2上通知
}
}
}
// 消费者 - 错误实现,可能导致死锁
public String consume() throws InterruptedException {
synchronized (lock2) { // 先获取lock2
while (buffer.isEmpty()) {
System.out.println("缓冲区为空,消费者等待...");
lock2.wait(); // 在lock2上等待
}
// 错误:在持有lock2的情况下尝试获取lock1
synchronized (lock1) { // 嵌套锁获取,但顺序与生产者相反
String item = buffer.remove(0);
System.out.println("消费者移除: " + item + ", 缓冲区大小: " + buffer.size());
lock1.notifyAll(); // 在lock1上通知
return item;
}
}
}
// 修复后的生产者实现
public void produceFixed(String item) throws InterruptedException {
synchronized (lock1) { // 保持一致的锁获取顺序
synchronized (lock2) { // 先lock1,再lock2
while (buffer.size() == MAX_BUFFER_SIZE) {
System.out.println("缓冲区已满,生产者等待...");
lock2.wait(); // 使用内部锁等待
}
buffer.add(item);
System.out.println("生产者添加: " + item + ", 缓冲区大小: " + buffer.size());
lock2.notifyAll();
}
}
}
// 修复后的消费者实现
public String consumeFixed() throws InterruptedException {
synchronized (lock1) { // 保持一致的锁获取顺序
synchronized (lock2) { // 先lock1,再lock2
while (buffer.isEmpty()) {
System.out.println("缓冲区为空,消费者等待...");
lock2.wait();
}
String item = buffer.remove(0);
System.out.println("消费者移除: " + item + ", 缓冲区大小: " + buffer.size());
lock2.notifyAll();
return item;
}
}
}
public static void main(String[] args) {
ProducerConsumerDeadlock example = new ProducerConsumerDeadlock();
// 创建生产者线程
Thread producerThread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
// 使用修复后的方法
example.produceFixed("产品-" + i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
// 创建消费者线程
Thread consumerThread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
// 使用修复后的方法
example.consumeFixed();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
producerThread.start();
consumerThread.start();
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("程序完成");
}
}9.2 数据库事务死锁
- 多个事务并发执行时,可能因为资源竞争导致死锁
- 解决方法:事务按相同顺序访问资源,设置超时机制等
10. 总结
死锁是并发系统中的一个复杂问题,需要从多个角度进行预防、避免、检测和恢复。在实际开发中,应当优先考虑预防和避免策略,因为检测和恢复通常成本更高。
10.1 选择合适的死锁处理策略
- 小型系统:可以采用更严格的预防策略
- 大型系统:可能需要权衡性能和安全性,采用避免或检测恢复策略
- 实时系统:通常需要使用预防或避免策略,确保系统可靠性
10.2 最佳实践
- 始终按照固定顺序获取锁
- 使用超时机制
- 最小化锁的持有时间
- 避免嵌套锁
- 使用并发集合类和原子操作
- 定期监控系统,检测潜在的死锁问题
通过合理的设计和实现,可以有效减少死锁发生的可能性,提高系统的稳定性和性能。