Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

第 67 章 Spring Integration

目录

67.1. Spring Redis Lock
67.1.1. Maven 依赖
67.1.2. 配置锁
67.1.3. 使用方法
67.1.4. Service 中使用方法
67.1.5. 在定时任务中使用
67.1.6. 在Controller中使用
67.1.7. 使用模板方法模式封装
67.2. MQTT Support
67.2.1. 入站消息通道适配器
67.2.2. 出站通道适配器
67.2.3. @MessagingGateway 定义消息网管接口
67.2.4. 手动 ACK 应答
67.2.5. Spring boot with Mqtt v5
	
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>	
	
	

67.1. Spring Redis Lock

67.1.1. Maven 依赖

			
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-redis</artifactId>
        </dependency>			
			
			

67.1.2. 配置锁

			
package cn.netkiller.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;

@Configuration
public class RedisLockRegistryConfiguration {
    @Bean
    public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
        return new RedisLockRegistry(redisConnectionFactory, "netkiller-lock");
    }
}
			
			
			

配置默认超时时间

			
  @Bean(destroyMethod = "destroy")
  public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
    return new RedisLockRegistry(redisConnectionFactory, "neo-lock",
        TimeUnit.MINUTES.toMillis(10));
  }			
			
			

67.1.3. 使用方法

通过 Autowired 注解使用 RedisLockRegistry

			
@Autowired
private RedisLockRegistry redisLockRegistry;
			
		Lock lock = redisLockRegistry.obtain(device);
        if (lock.tryLock()) {
            try {
                // manipulate protected state
            } finally {
                lock.unlock();
            }
        } else {
            // perform alternative actions
        }			
			
			
			
@Autowired
private RedisLockRegistry redisLockRegistry;

Lock lock = redisLockRegistry.obtain(key);
boolean locked = false;
try {
  locked = lock.tryLock();
  if (!locked) {
    // 没有获取到锁的逻辑    
  }

  // 获取锁的逻辑
} finally {
  if (locked) {
    lock.unlock();
  }
}			
			
			

如果没有上锁,上锁后返回 true 状态。如果已经上锁阻塞等待10秒,然后再返回锁状态

			
    public boolean isLock(String device) {

        Lock lock = redisLockRegistry.obtain(device);
        boolean status = false;
        try {
            status = lock.tryLock(10, TimeUnit.SECONDS);

        } catch (Exception e) {
            log.info(e.getMessage());
        }
        log.warn("status: {} <<<<<<<<<<", status);
        return status;
    }			
			
			

方法二,通过构造方法使用 RedisLockRegistry

			
@Service
public class DistributedLockService {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public DistributedLockService(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    public void processWithLock(String lockKey) {
        // 获取锁对象
        Lock lock = redisLockRegistry.obtain(lockKey);
        
        try {
            // 尝试获取锁,最多等待10秒,锁持有30秒
            if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
                try {
                    // 成功获取锁,执行业务逻辑
                    System.out.println("获得锁,执行业务逻辑");
                    // 模拟业务处理
                    Thread.sleep(5000);
                } finally {
                    // 释放锁
                    lock.unlock();
                    System.out.println("释放锁");
                }
            } else {
                System.out.println("获取锁失败");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("线程被中断");
        }
    }
}			
			
			

67.1.4. Service 中使用方法

			
@Service
public class LockExamples {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public LockExamples(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    // 方式1:立即尝试获取锁(不等待)
    public boolean tryLockImmediately(String key) {
        Lock lock = redisLockRegistry.obtain(key);
        return lock.tryLock();
    }
    
    // 方式2:带超时的尝试获取锁
    public boolean tryLockWithTimeout(String key) throws InterruptedException {
        Lock lock = redisLockRegistry.obtain(key);
        return lock.tryLock(5, TimeUnit.SECONDS); // 等待5秒
    }
    
    // 方式3:带超时和租期的获取锁
    public boolean tryLockWithLease(String key) throws InterruptedException {
        Lock lock = redisLockRegistry.obtain(key);
        return lock.tryLock(10, 30, TimeUnit.SECONDS); // 等待10秒,租期30秒
    }
    
    // 方式4:阻塞获取锁(不推荐,可能导致死锁)
    public void lockBlockingly(String key) {
        Lock lock = redisLockRegistry.obtain(key);
        lock.lock(); // 一直等待直到获取锁
        try {
            // 业务逻辑
        } finally {
            lock.unlock();
        }
    }
}			
			
			

67.1.5. 在定时任务中使用

			
@Component
public class ScheduledTask {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public ScheduledTask(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    @Scheduled(cron = "0 */5 * * * ?") // 每5分钟执行一次
    public void executeScheduledTask() {
        String lockKey = "scheduled-task-lock";
        Lock lock = redisLockRegistry.obtain(lockKey);
        
        try {
            if (lock.tryLock(0, 10, TimeUnit.MINUTES)) { // 不等待,锁10分钟
                try {
                    // 确保只有一个实例执行定时任务
                    System.out.println("开始执行定时任务...");
                    // 任务逻辑
                } finally {
                    lock.unlock();
                }
            } else {
                System.out.println("其他实例正在执行定时任务");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}			
			
			

67.1.6. 在Controller中使用

			
@RestController
@RequestMapping("/api")
public class LockController {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public LockController(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    @PostMapping("/process/{resourceId}")
    public ResponseEntity<String> processResource(@PathVariable String resourceId) {
        String lockKey = "resource-lock:" + resourceId;
        Lock lock = redisLockRegistry.obtain(lockKey);
        
        try {
            if (lock.tryLock(3, 15, TimeUnit.SECONDS)) {
                try {
                    // 处理资源
                    return ResponseEntity.ok("处理成功: " + resourceId);
                } finally {
                    lock.unlock();
                }
            } else {
                return ResponseEntity.status(HttpStatus.CONFLICT)
                    .body("资源正在被处理,请稍后重试");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("处理中断");
        }
    }
}			
			
			

67.1.7. 使用模板方法模式封装

			
@Component
public class DistributedLockTemplate {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public DistributedLockTemplate(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    public <T> T executeWithLock(String lockKey, Supplier<T> supplier) {
        return executeWithLock(lockKey, 5, 30, supplier);
    }
    
    public <T> T executeWithLock(String lockKey, long waitTime, long leaseTime, Supplier<T> supplier) {
        Lock lock = redisLockRegistry.obtain(lockKey);
        
        try {
            if (lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS)) {
                try {
                    return supplier.get();
                } finally {
                    lock.unlock();
                }
            } else {
                throw new RuntimeException("获取分布式锁失败: " + lockKey);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("获取锁时被中断", e);
        }
    }
    
    // 无返回值的版本
    public void executeWithLock(String lockKey, Runnable runnable) {
        executeWithLock(lockKey, () -> {
            runnable.run();
            return null;
        });
    }
}

// 使用示例
@Service
public class BusinessService {
    
    private final DistributedLockTemplate lockTemplate;
    
    public BusinessService(DistributedLockTemplate lockTemplate) {
        this.lockTemplate = lockTemplate;
    }
    
    public void updateInventory(String productId, int quantity) {
        lockTemplate.executeWithLock("inventory:" + productId, () -> {
            // 更新库存逻辑
            System.out.println("更新产品 " + productId + " 的库存: " + quantity);
            return null;
        });
    }
}