美團一面:項目中使用過Redis嗎?
引言
Redis,作為一種開源的、基于內(nèi)存且支持持久化的鍵值存儲系統(tǒng),以其卓越的性能、豐富靈活的數(shù)據(jù)結(jié)構(gòu)和高度可擴展性在全球范圍內(nèi)廣受歡迎。Redis不僅提供了一種簡單直觀的方式來存儲和檢索數(shù)據(jù),更因其支持數(shù)據(jù)結(jié)構(gòu)如字符串、哈希、列表、集合、有序集合等多種類型,使得其在眾多場景下表現(xiàn)出強大的適用性和靈活性。
Redis的核心特點包括:
- 高性能:基于內(nèi)存操作,讀寫速度極快,特別適用于對性能要求高的實時應(yīng)用。
- 數(shù)據(jù)持久化:支持RDB和AOF兩種持久化方式,確保即使在服務(wù)器重啟后也能恢復(fù)數(shù)據(jù)。
- 分布式的特性:通過主從復(fù)制、哨兵模式或集群模式,Redis可以輕松地構(gòu)建高可用和可擴展的服務(wù)。
- 豐富的數(shù)據(jù)結(jié)構(gòu):提供了多種數(shù)據(jù)結(jié)構(gòu)支持,便于開發(fā)人員根據(jù)實際需求進行數(shù)據(jù)建模和處理。
Redis的廣泛應(yīng)用跨越了多個行業(yè)和技術(shù)領(lǐng)域,諸如網(wǎng)站加速、緩存服務(wù)、會話管理、實時統(tǒng)計、排行榜、消息隊列、分布式鎖、社交網(wǎng)絡(luò)功能、限流控制等。本文將深入探討Redis在這些場景下的具體應(yīng)用方法及其背后的工作原理,旨在幫助開發(fā)者更好地理解和掌握Redis,以應(yīng)對各種復(fù)雜的業(yè)務(wù)需求,并充分發(fā)揮其潛能。同時,我們也將關(guān)注如何在實踐中平衡Redis的性能、安全性、一致性等方面的挑戰(zhàn),為實際項目帶來更高的價值。
數(shù)據(jù)緩存
在高并發(fā)訪問的場景下,數(shù)據(jù)庫經(jīng)常成為系統(tǒng)的瓶頸。Redis因其內(nèi)存存儲、讀取速度快的特點,常被用作數(shù)據(jù)庫查詢結(jié)果的緩存層,有效降低數(shù)據(jù)庫負載,提高整體系統(tǒng)的響應(yīng)速度。這也是我們使用場景頻率最高的一個。
通常我們選擇使用String類型來存儲數(shù)據(jù)庫查詢結(jié)果,如單個實體對象的JSON序列化形式。
@Service
public class ProductService {
@Autowired
private RedisTemplate<String, Product> redisTemplate;
// 使用@Cacheable注解進行緩存
@Cacheable(value = "productCache", key = "#id")
public Product getProductById(String id) {
// 此處是從數(shù)據(jù)庫或其他數(shù)據(jù)源獲取商品的方法
// 在實際場景中,如果緩存命中,則不會執(zhí)行下面的數(shù)據(jù)庫查詢邏輯
return getProductFromDatabase(id);
}
}
而使用Redis作為緩存使用時,有一些特別需要注意的事項:
- 緩存穿透:當查詢的數(shù)據(jù)在數(shù)據(jù)庫和緩存中均不存在時,可能會導(dǎo)致大量的無效請求直接打到數(shù)據(jù)庫??赏ㄟ^布隆過濾器預(yù)防緩存穿透。
- 緩存雪崩:若大量緩存在同一時刻失效,所有請求都會涌向數(shù)據(jù)庫,造成瞬時壓力過大??赏ㄟ^設(shè)置合理的過期時間分散、預(yù)加載或采用Redis集群等方式避免。
- 緩存一致性:當數(shù)據(jù)庫數(shù)據(jù)發(fā)生變化時,需要及時更新緩存,避免數(shù)據(jù)不一致??梢圆捎弥鲃痈虏呗裕ㄈ绫O(jiān)聽數(shù)據(jù)庫binlog)或被動更新策略(如在讀取時判斷數(shù)據(jù)新鮮度)。
而對于數(shù)據(jù)緩存,我們常使用的業(yè)務(wù)場景如熱點數(shù)據(jù)存儲、全頁緩存等。
會話管理
在說會話管理之前,我們來簡單介紹一下Spring Session。Spring Session 是 Spring Framework 的一個項目,旨在簡化分布式應(yīng)用程序中的會話管理。在傳統(tǒng)的基于 Servlet 的應(yīng)用程序中,會話管理是通過 HttpSession 接口實現(xiàn)的,但在分布式環(huán)境中,每個節(jié)點上的 HttpSession 不能簡單地共享,因此需要一種機制來管理會話并確保會話在集群中的一致性。
Spring Session 提供了一種簡單的方法來解決這個問題,它將會話數(shù)據(jù)從容器(如 Tomcat 或 Jetty)中分離出來,并存儲在外部數(shù)據(jù)存儲(如 Redis、MongoDB、JDBC 等)中。這樣,不同節(jié)點上的應(yīng)用程序?qū)嵗梢怨蚕硐嗤臅挃?shù)據(jù),實現(xiàn)分布式環(huán)境下的會話管理。
所以在Web應(yīng)用中,Redis用于會話管理時,可以取代傳統(tǒng)基于服務(wù)器內(nèi)存或Cookie的會話存儲方案。通過將會話數(shù)據(jù)序列化后存儲為Redis中的鍵值對,實現(xiàn)跨多個服務(wù)器實例的會話共享。
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
<version>3.2.0</version>
</dependency>
然后我們在啟動類中,使用@EnableRedisHttpSession啟用Redis作為會話存儲。
@Configuration
@EnableRedisHttpSession
public class RedisSessionConfig {
@Bean
public RedisConnectionFactory connectionFactory() {
// 這里假設(shè)你已經(jīng)在application.properties或application.yml中配置了Redis的信息
// 根據(jù)實際情況填寫Redis服務(wù)器地址、端口等信息
return new LettuceConnectionFactory();
}
}
以上是一個簡單的Spring Session使用Redis進行會話管理的示例代碼。通過這種方式,我們可以輕松地在分布式環(huán)境中管理會話,并確保會話數(shù)據(jù)的一致性和可靠性。如果需要了解一些具體的用法,請自行參考Spring Session。
排行榜與計分板
有序集合(Sorted Sets)是Redis的一種強大數(shù)據(jù)結(jié)構(gòu),可以用來實現(xiàn)動態(tài)排行榜,每個成員都有一個分數(shù),按分數(shù)排序。有序集合中的每一個成員都有一個分數(shù)(score),成員依據(jù)其分數(shù)進行排序,且成員本身是唯一的。
當需要給某個用戶增加積分或改變其排名時,可以使用ZADD命令向有序集合中添加或更新成員及其分數(shù)。例如,ZADD leaderboard score member,這里的ranking是有序集合的名稱,score是用戶的積分值,member是用戶ID。
查詢排行榜時,可以使用ZRANGE命令獲取指定范圍內(nèi)的成員及其分數(shù),例如,ZRANGE ranking 0 -1 WITHSCORES,這條命令會返回集合中所有的成員及其對應(yīng)的分數(shù),按照分數(shù)從低到高排序。
若要按照分數(shù)從高到低顯示排行榜,使用ZREVRANGE命令,如ZREVRANGE ranking 0 -1 WITHSCORES。
@Service
public class RankingService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void addToRanking(String playerName, int score) {
redisTemplate.opsForZSet().add("ranking", playerName, score);
}
public List<RankingInfo> getRanking() {
List<RankingInfo> rankingInfos = new ArrayList<>();
Set<ZSetOperations.TypedTuple<String>> rankingSet = redisTemplate.opsForZSet().rangeWithScores("ranking", 0, -1);
for (ZSetOperations.TypedTuple<String> tuple : rankingSet) {
RankingInfo rankingInfo = new RankingInfo();
rankingInfo.setPlayerName(tuple.getValue());
rankingInfo.setScore(tuple.getScore().intValue());
rankingInfos.add(rankingInfo);
System.out.println("playerName: " + tuple.getValue() + ", score: " + tuple.getScore().intValue());
}
return rankingInfos;
}
}
我們模擬請求,往redis中填入一些數(shù)據(jù),在獲取排行榜:
圖片
在實際場景中,有序集合非常適合處理實時動態(tài)變化的排行榜數(shù)據(jù),比如京東的月度銷量榜單、商品按時間的上新排行榜等,因為它的更新和查詢操作都是原子性的,并且能高效地支持按分數(shù)排序的操作。
計數(shù)器與統(tǒng)計
Redis的原子性操作如INCR和DECR可以用于計數(shù),確保在高并發(fā)環(huán)境下的計數(shù)準確性。比如在流量統(tǒng)計、電商網(wǎng)站商品的瀏覽量、視頻網(wǎng)站視頻的播放數(shù)贊等場景的應(yīng)用。
@Service
public class CounterService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void incrementLikeCount(String postId) {
redisTemplate.opsForValue().increment(postId + ":likes");
}
public void decrementLikeCount(String postId) {
redisTemplate.opsForValue().decrement(postId + ":likes");
}
public long getLikeCount(String postId) {
String value = redisTemplate.opsForValue().get(postId + ":likes");
return StringUtils.isBlank(value) ? 0 : Long.parseLong(value);
}
}
在使用Redis實現(xiàn)點贊,統(tǒng)計等功能時一定要考慮設(shè)置計數(shù)值的最大值或最小值限制,以及過期策略。
分布式鎖
分布式鎖
Redis的SETNX(設(shè)置并檢查是否存在)和EXPIRE命令組合可以實現(xiàn)分布式鎖,因其操作時原子性的,所以可以確保在分布式環(huán)境下同一資源只能被一個客戶端修改。
使用 Redis 實現(xiàn)分布式鎖通常會使用 Redis 的 SETNX 命令。這個命令用于設(shè)置一個鍵的值,如果這個鍵不存在的話,它會設(shè)置成功并返回 1,如果這個鍵已經(jīng)存在,則設(shè)置失敗并返回 0。結(jié)合 Redis 的 EXPIRE 命令,可以為這個鍵設(shè)置一個過期時間,確保即使獲取鎖的客戶端異常退出,鎖也會在一段時間后自動釋放。
@Component
public class DistributedLock {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public boolean acquireLock(String lockKey, String requestId, long expireTime) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime);
return result != null && result;
}
public void releaseLock(String lockKey, String requestId) {
String value = redisTemplate.opsForValue().get(lockKey);
if (value != null && value.equals(requestId)) {
redisTemplate.delete(lockKey);
}
}
}
使用分布式鎖時,務(wù)必確保在加鎖和解鎖操作之間處理完臨界區(qū)代碼,否則可能出現(xiàn)死鎖。并且要注意鎖定超時時間應(yīng)當合理設(shè)置,以避免鎖定資源長時間無法釋放。
關(guān)于分布式鎖,推薦使用一些第三方的分布式鎖框架,例如Redisson
全局ID
在全局ID生成的場景中,我們可以使用 Redis 的原子遞增操作來實現(xiàn)。通過對 Redis 中的一個特定的 key 進行原子遞增操作,可以確保生成的ID是唯一的。
@Component
public class UniqueIdGenerator {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public long generateUniqueId(String key) {
return redisTemplate.opsForValue().increment(key, 1);
}
}
庫存扣減
在扣減庫存的場景中,我們可以使用 Redis 的原子遞減操作來實現(xiàn)。將庫存數(shù)量存儲在 Redis 的一個特定key中(例如倉庫編碼:SKU),然后通過遞減操作來實現(xiàn)庫存的扣減。這樣可以保證在高并發(fā)情況下,庫存扣減的原子性。
@Component
public class StockService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**商品庫存的key*/
private static final String STOCK_PREFIX = "stock:%s:%s";
/**
* 扣減庫存
* @param warehouseCode
* @param productId
* @param quantity
* @return
*/
public boolean decreaseStock(String warehouseCode, String productId, long quantity) {
String key = String.format(STOCK_PREFIX, warehouseCode, productId);
Long stock = redisTemplate.opsForValue().decrement(key, quantity);
return stock >= 0;
}
}
秒殺
在秒殺場景中,使用Lua腳本。Lua 腳本可以在 Redis 服務(wù)器端原子性地執(zhí)行多個命令,這樣可以避免在多個命令之間出現(xiàn)競態(tài)條件。
我們使用Lua腳本來檢查庫存是否足夠并進行扣減操作。如果庫存足夠,則減少庫存并返回 true;如果庫存不足,則直接返回 false。通過 Lua 腳本的原子性執(zhí)行,可以確保在高并發(fā)情況下,庫存扣減操作的正確性和一致性。
我們先定義一個扣減庫存的lua腳本,使用Lua腳本一次性執(zhí)行獲取庫存、判斷庫存是否充足以及扣減庫存這三個操作,確保了操作的原子性
-- 獲取Lua腳本參數(shù):商品ID和要購買的數(shù)量
local productId = KEYS[1]
local amount = tonumber(ARGV[1])
-- 獲取當前庫存
local currentStock = tonumber(redis.call('GET', 'seckill:product:'..productId))
-- 判斷庫存是否充足
if currentStock <= 0 or currentStock < amount then
return 0
end
-- 扣減庫存
redis.call('DECRBY', 'seckill:product:'..productId, amount)
-- 返回成功標志
return 1
然后在秒殺服務(wù)中使用Redis的DefaultRedisScript執(zhí)行l(wèi)ua腳本,完成秒殺
@Component
public class SeckillService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 初始化RedisScript對象
*/
private final DefaultRedisScript<Long> seckillScript = new DefaultRedisScript<>();
{
seckillScript.setLocation(new ClassPathResource("rate_limiter.lua"));
seckillScript.setResultType(Long.class);
}
public boolean seckillyLua(String productId, int amount){
// 設(shè)置Lua腳本參數(shù)
List<String> keys = Collections.singletonList(productId);
List<String> args = Collections.singletonList(Integer.toString(amount));
// 執(zhí)行Lua腳本
Long result = redisTemplate.execute(seckillScript, keys, args);
// 如果執(zhí)行結(jié)果為1,表示秒殺成功
return Objects.equals(result, 1L);
}
}
關(guān)于秒殺場景,我們也可以使用WATCH命令監(jiān)視庫存鍵,然后嘗試獲取并扣減庫存。如果在WATCH之后、EXEC之前庫存發(fā)生了變化,exec方法會返回null,此時我們?nèi)∠鸚ATCH并重新嘗試整個流程,直到成功扣減庫存為止。這樣就實現(xiàn)了基于Redis樂觀鎖的秒殺場景,有效防止了超賣現(xiàn)象。
/**
* 秒殺方法
* @param productId 商品ID
* @param amount 要購買的數(shù)量
* @return 秒殺成功與否
*/
@Transactional(rollbackFor = Exception.class)
public boolean seckilByWatch(String productId, int amount) {
// 樂觀鎖事務(wù)操作
while (true) {
// WATCH指令監(jiān)控庫存鍵
redisTemplate.watch("stock:" + productId);
// 獲取當前庫存
String currentStockStr = redisTemplate.opsForValue().get("stock:" + productId);
if (currentStockStr == null) {
// 庫存不存在,可能是商品已售罄或異常情況
return false;
}
int currentStock = Integer.parseInt(currentStockStr);
// 判斷庫存是否充足
if (currentStock < amount) {
// 庫存不足,取消WATCH并退出循環(huán)
redisTemplate.unwatch();
return false;
}
// 開啟Redis事務(wù)
redisTemplate.multi();
// 執(zhí)行扣減庫存操作
redisTemplate.opsForValue().decrement("stock:" + productId, amount);
// 執(zhí)行其他與秒殺相關(guān)的操作,如增加訂單、更新用戶余額等...
// 提交事務(wù),如果在此期間庫存被其他客戶端修改,則exec返回null
List<Object> results = redisTemplate.exec();
// 如果事務(wù)執(zhí)行成功,跳出循環(huán)
if (!results.isEmpty()) {
return true;
}
}
}
消息隊列與發(fā)布/訂閱
Redis的發(fā)布/訂閱(Pub/Sub)模式,可以實現(xiàn)一個簡單的消息隊列。發(fā)布/訂閱模式允許消息的發(fā)布者(發(fā)布消息)和訂閱者(接收消息)之間解耦,消息的發(fā)布者不需要知道消息的接收者是誰,從而實現(xiàn)了一對多的消息傳遞。
首先我們需要定義一個消息監(jiān)聽器,我們可以實現(xiàn)這個借口并實現(xiàn)其中的方法來處理接收到的消息。這樣可以根據(jù)具體的業(yè)務(wù)需求來定義消息的處理邏輯。
public interface MessageListener {
void onMessage(String channel, String message);
}
然后我們就可以定義消息的生產(chǎn)者以及消費者。publish 方法用于向指定頻道發(fā)布消息,我們使用 RedisTemplate 的 convertAndSend 方法來發(fā)送消息到指定的頻道。而subscribe方法用于訂閱指定的頻道,并設(shè)置消息監(jiān)聽器。當有消息發(fā)布到指定的頻道時,消息監(jiān)聽器會收到消息并進行處理。
@Component
public class MessageQueue {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void publish(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
public void subscribe(String channel, MessageListener listener) {
redisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> {
listener.onMessage(channel, message);
}, channel.getBytes());
}
}
使用Redis的發(fā)布訂閱模式實現(xiàn)一個輕量級的隊列時要注意:Pub/Sub是非持久化的,一旦消息發(fā)布,沒有訂閱者接收的話,消息就會丟失。還有就是Pub/Sub不適合大規(guī)模的消息堆積場景,因為它不保證消息順序和重復(fù)消費,更適合實時廣播型消息推送。
社交網(wǎng)絡(luò)
在社交網(wǎng)絡(luò)中,Redis可以利用集合(Set)、哈希(Hash)和有序集合(Sorted Set)等數(shù)據(jù)結(jié)構(gòu)構(gòu)建用戶關(guān)系圖譜。
使用哈希(Hash)數(shù)據(jù)結(jié)構(gòu)存儲用戶的個人資料信息,每個用戶對應(yīng)一個哈希表,其中包含用戶的各種屬性,比如用戶名、年齡、性別等。
@Component
public class RelationshipGraphService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**用戶資料*/
private static final String USER_PROFILE_PREFIX = "user_profile:";
/**
* 存儲用戶個人資料
* @param userId
* @param profile
*/
public void setUserProfile(String userId, Map<String, String> profile) {
String key = USER_PROFILE_PREFIX + userId;
redisTemplate.opsForHash().putAll(key, profile);
}
/**
* 獲取用戶個人資料
* @param userId
* @return
*/
public Map<Object, Object> getUserProfile(String userId) {
String key = USER_PROFILE_PREFIX + userId;
return redisTemplate.opsForHash().entries(key);
}
}
使用集合(Set)數(shù)據(jù)結(jié)構(gòu)來存儲用戶的好友關(guān)系。每個用戶都有一個集合,其中包含了他的所有好友的用戶ID。
@Component
public class RelationshipGraphService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**用戶好友*/
private static final String FRIENDS_PREFIX = "friends:";
/**
* 添加好友關(guān)系
* @param userId
* @param friendId
*/
public void addFriend(String userId, String friendId) {
String key = FRIENDS_PREFIX + userId;
redisTemplate.opsForSet().add(key, friendId);
}
/**
* 獲取用戶的所有好友
* @param userId
* @return
*/
public Set<String> getFriends(String userId) {
String key = FRIENDS_PREFIX + userId;
return redisTemplate.opsForSet().members(key);
}
}
同理,我們還可以實現(xiàn)點贊的業(yè)務(wù)場景
@Service
public class LikeService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 點贊
* @param objectId
* @param userId
*/
public void like(String objectId, String userId) {
// 將點贊人放入zset中
redisTemplate.opsForSet().add(getLikeKey(objectId), userId);
}
/**
* 取消點贊
* @param objectId
* @param userId
*/
public void unlike(String objectId, String userId) {
// 減少點贊人數(shù)
redisTemplate.opsForSet().remove(getLikeKey(objectId), userId);
}
/**
* 是否點贊
* @param objectId
* @param userId
* @return
*/
public Boolean isLiked(String objectId, String userId) {
return redisTemplate.opsForSet().isMember(getLikeKey(objectId), userId);
}
/**
* 獲取點贊數(shù)
* @param objectId
* @return
*/
public Long getLikeCount(String objectId) {
return redisTemplate.opsForSet().size(getLikeKey(objectId));
}
/**
* 獲取所有點贊的用戶
* @param objectId
* @return
*/
public Set<String> getLikedUsers(String objectId) {
return redisTemplate.opsForSet().members(getLikeKey(objectId));
}
private String getLikeKey(String objectId) {
return "likes:" + objectId;
}
}
使用有序集合(Sorted Set)數(shù)據(jù)結(jié)構(gòu)來存儲用戶的關(guān)注者列表。有序集合中的成員是關(guān)注者的用戶ID,而分數(shù)可以是關(guān)注時間或者其他指標,比如活躍度。
@Component
public class RelationshipGraphService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**用戶關(guān)注者*/
private static final String FOLLOWERS_PREFIX = "followers:";
/**
* 添加關(guān)注者
* @param userId
* @param followerId
* @param score
*/
public void addFollower(String userId, String followerId, double score) {
String key = FOLLOWERS_PREFIX + userId;
redisTemplate.opsForZSet().add(key, followerId, score);
}
/**
* 獲取用戶的關(guān)注者列表(按照關(guān)注時間排序)
* @param userId
* @return
*/
public Set<String> getFollowers(String userId) {
String key = FOLLOWERS_PREFIX + userId;
return redisTemplate.opsForZSet().range(key, 0, -1);
}
}
除此之外,我們還可以實現(xiàn)可能認識的人,共同好友等業(yè)務(wù)場景。
限流與速率控制
Redis可以精確地實施限流策略,如使用INCR命令結(jié)合Lua腳本實現(xiàn)滑動窗口限流。
創(chuàng)建一個Lua腳本,該腳本負責(zé)檢查在一定時間段內(nèi)請求次數(shù)是否超過限制。
-- rate_limiter.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local timeWindow = tonumber(ARGV[2]) -- 時間窗口,例如單位為秒
-- 獲取當前時間戳
local currentTime = redis.call('TIME')[1]
-- 獲取最近timeWindow秒內(nèi)的請求次數(shù)
local count = redis.call('ZCOUNT', key .. ':requests', currentTime - timeWindow, currentTime)
-- 如果未超過限制,則累加請求次數(shù),并返回true
if count < limit then
redis.call('ZADD', key .. ':requests', currentTime, currentTime)
return 1
else
return 0
end
限流服務(wù)中Redis使用DefaultRedisScript執(zhí)行Lua腳本
@Component
public class RateLimiter {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**限流Key*/
private static final String TATE_LIMITER_KEY = "rate-limit:%s";
/**規(guī)定的時間窗口內(nèi)允許的最大請求數(shù)量*/
private static final Integer LIMIT = 100;
/**限流策略的時間窗口長度,單位是秒*/
private static final Integer TIME_WINDOW = 60;
/**
* 初始化RedisScript對象
*/
private final DefaultRedisScript<Long> rateLimiterScript = new DefaultRedisScript<>();
{
rateLimiterScript.setLocation(new ClassPathResource("rate_limiter.lua"));
rateLimiterScript.setResultType(Long.class);
}
/**
* 限流方法 1分鐘內(nèi)最多100次請求
* @param userId
* @return
*/
public boolean allowRequest(String userId) {
String key = String.format(TATE_LIMITER_KEY, userId);
List<String> keys = Collections.singletonList(key);
List<String> args = Arrays.asList(String.valueOf(LIMIT), String.valueOf(TIME_WINDOW));
// 執(zhí)行Lua腳本
Long result = redisTemplate.execute(rateLimiterScript, keys, args);
// 結(jié)果為1表示允許請求,0表示請求被限流
return Objects.equals(result, 1L);
}
}
位運算與位圖應(yīng)用
Redis的位圖(BitMap)是一種特殊的數(shù)據(jù)結(jié)構(gòu),它允許我們在單一的字符串鍵(String Key)中存儲一系列二進制位(bits),每個位對應(yīng)一個布爾值(0或1),并通過偏移量(offset)來定位和操作這些位。位圖極大地節(jié)省了存儲空間,尤其適合于大規(guī)模數(shù)據(jù)的標記、統(tǒng)計和篩選場景。
在位圖中,每一位相當于一個標識符,例如可以用來表示用戶是否在線、商品是否有庫存、用戶是否已讀郵件等。相對于傳統(tǒng)的鍵值對存儲。位圖可以非??焖俚亟y(tǒng)計滿足特定條件的元素個數(shù),如統(tǒng)計在線用戶數(shù)、激活用戶數(shù)等。
@Service
public class UserOnlineStatusService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String ONLINE_STATUS_KEY = "online_status";
private static final String RETENTION_RATE_KEY_PREFIX = "retention_rate:";
private static final String DAILY_ACTIVITY_KEY_PREFIX = "daily_activity:";
/**
* 設(shè)置用戶在線狀態(tài)為在線
* @param userId
*/
public void setUserOnline(long userId) {
redisTemplate.opsForValue().setBit(ONLINE_STATUS_KEY, userId, true);
}
/**
* 設(shè)置用戶在線狀態(tài)為離線
* @param userId
*/
public void setUserOffline(long userId) {
redisTemplate.opsForValue().setBit(ONLINE_STATUS_KEY, userId, false);
}
/**
* 獲取用戶在線狀態(tài)
* @param userId
* @return
*/
public boolean isUserOnline(long userId) {
return redisTemplate.opsForValue().getBit(ONLINE_STATUS_KEY, userId);
}
/**
* 統(tǒng)計在線用戶數(shù)量
* @return
*/
public long countOnlineUsers() {
return getCount(ONLINE_STATUS_KEY);
}
/**
* 記錄用戶的留存情況
* @param userId
* @param daysAgo
*/
public void recordUserRetention(long userId, int daysAgo) {
String key = RETENTION_RATE_KEY_PREFIX + LocalDate.now().minusDays(daysAgo).toString();
redisTemplate.opsForValue().setBit(key, userId, true);
}
/**
* 獲取指定日期的留存率
* @param daysAgo
* @return
*/
public double getRetentionRate(int daysAgo) {
String key = RETENTION_RATE_KEY_PREFIX + LocalDate.now().minusDays(daysAgo).toString();
long totalUsers = countOnlineUsers();
long retainedUsers = getCount(key);
return (double) retainedUsers / totalUsers * 100;
}
/**
* 記錄用戶的每日活躍情況
* @param userId
*/
public void recordUserDailyActivity(long userId) {
String key = DAILY_ACTIVITY_KEY_PREFIX + LocalDate.now().toString();
redisTemplate.opsForValue().setBit(key, userId, true);
}
/**
* 獲取指定日期的活躍用戶數(shù)量
* @param date
* @return
*/
public long countDailyActiveUsers(LocalDate date) {
String key = DAILY_ACTIVITY_KEY_PREFIX + date.toString();
return getCount(key);
}
/**
* 獲取最近幾天每天的活躍用戶數(shù)量列表
* @param days
* @return
*/
public List<Long> getDailyActiveUsers(int days) {
LocalDate currentDate = LocalDate.now();
List<Long> results = Lists.newArrayList();
for (int i = 0; i < days; i++) {
LocalDate date = currentDate.minusDays(i);
String key = DAILY_ACTIVITY_KEY_PREFIX + date.toString();
results.add(getCount(key));
}
return results;
}
/**
* 獲取key下的數(shù)量
* @param key
* @return
*/
private long getCount(String key) {
return (long) redisTemplate.execute((RedisCallback<Long>) connection -> connection.bitCount(key.getBytes()));
}
}
最新列表
Redis的List(列表)是一個基于雙向鏈表實現(xiàn)的數(shù)據(jù)結(jié)構(gòu),允許我們在列表頭部(左端)和尾部(右端)進行高效的插入和刪除操作。LPUSH命令:全稱是LIST PUSH LEFT,用于將一個或多個值插入到列表的最左邊(頭部),在這里用于將最新生成的內(nèi)容ID推送到列表頂部,保證列表中始終是最新的內(nèi)容排在前面。
LTRIM命令用于修剪列表,保留指定范圍內(nèi)的元素,從而限制列表的長度。在這個場景中,每次添加新ID后都會執(zhí)行LTRIM操作,只保留最近的N個ID,確保列表始終保持固定長度,即只包含最新的內(nèi)容ID。
@Service
public class LatestListService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String LATEST_LIST_KEY = "latest_list";
/**
* 添加最新內(nèi)容ID到列表頭部
* @param contentId 內(nèi)容ID
*/
public void addLatestContent(String contentId) {
ListOperations<String, String> listOps = redisTemplate.opsForList();
listOps.leftPush(LATEST_LIST_KEY, contentId);
// 限制列表最多存儲N個ID,假設(shè)N為100
listOps.trim(LATEST_LIST_KEY, 0, 99);
}
/**
* 獲取最新的N個內(nèi)容ID
* @param count 要獲取的數(shù)量,默認為10
* @return 最新的內(nèi)容ID列表
*/
public List<String> getLatestContentIds(int count) {
ListOperations<String, String> listOps = redisTemplate.opsForList();
return listOps.range(LATEST_LIST_KEY, 0, count - 1);
}
}
抽獎
借助Redis的Set數(shù)據(jù)結(jié)構(gòu)以及其內(nèi)置的Spop命令,我們能夠高效且隨機地選定抽獎獲勝者。Set作為一種不允許包含重復(fù)成員的數(shù)據(jù)集合,其特性天然適用于防止抽獎過程中出現(xiàn)重復(fù)參與的情況,確保每位參與者僅擁有一個有效的抽獎資格。
由于Set內(nèi)部元素的排列不具備確定性,這意味著在對集合執(zhí)行隨機獲取操作時,每一次選取都將獨立且不可預(yù)測,這與抽獎活動中所要求的隨機公平原則高度契合。
Redis的Spop命令允許我們在單個原子操作下,不僅隨機選取,還會從Set中移除指定數(shù)量(默認為1)的元素。這一原子操作機制尤為關(guān)鍵,在高并發(fā)環(huán)境下,即便有多個請求同時進行抽獎,Spop也能夠確保同一時刻只有一個請求能成功獲取并移除一個元素,有效避免了重復(fù)選擇同一位參與者作為獲獎?wù)叩目赡苄浴?/p>
@Service
public class LotteryService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String PARTICIPANTS_SET_KEY = "lottery:participants";
/**
* 添加參與者到抽獎名單
* @param participant 參與者ID
*/
public void joinLottery(String participant) {
redisTemplate.opsForSet().add(PARTICIPANTS_SET_KEY, participant);
}
/**
* 抽取一名幸運兒
* @return 幸運兒ID
*/
public String drawWinner() {
// 使用Spop命令隨機抽取一個參與者
return redisTemplate.opsForSet().pop(PARTICIPANTS_SET_KEY);
}
/**
* 抽取N個幸運兒
* @param count 抽取數(shù)量
* @return 幸運兒ID列表
*/
public List<String> drawWinners(int count) {
return redisTemplate.opsForSet().pop(PARTICIPANTS_SET_KEY, count);
}
}
Stream類型
Redis Stream作為一種自Redis 5.0起引入的高級數(shù)據(jù)結(jié)構(gòu),專為存儲和處理有序且持久的消息流而設(shè)計。可視作一個分布式的、具備持久特性的消息隊列,通過唯一的鍵名來標識每個Stream,其中容納了多個攜帶時間戳和唯一標識符的消息實體。
每條存儲于Stream中的消息都具有全球唯一的message ID,該ID內(nèi)嵌時間戳和序列編號,旨在確保即使在復(fù)雜的集群部署中仍能保持消息的嚴格時序性。這些消息內(nèi)容會持久存儲在Redis中,確保即使服務(wù)器重啟也能安全恢復(fù)。
生產(chǎn)者利用XADD指令將新消息添加到Stream中,而消費者則通過XREAD或針對多消費者組場景優(yōu)化的XREADGROUP命令來讀取并處理消息。XREADGROUP尤其擅長處理多消費者組間的公平分配和持久訂閱,確保消息的公正、有序送達各個消費者。
Stream核心特性之一是支持消費者組機制,消費者組內(nèi)的不同消費者可獨立地消費消息,并通過XACK命令確認已消費的消息,從而實現(xiàn)了消息的持久化消費和至少一次(at-least-once)交付保證。當消息量超出消費者處理能力時,未處理的消息可在Stream中積壓,直到達到預(yù)設(shè)的最大容量限制。此外,還能設(shè)定消息的有效期(TTL),逾期未被消費的消息將自動剔除。即使在網(wǎng)絡(luò)傳輸過程中消息遭受損失,亦可通過message ID保障消息的冪等性重新投遞。盡管網(wǎng)絡(luò)條件可能導(dǎo)致消息到達消費者的時間順序與生產(chǎn)者發(fā)出的順序有所偏差,但Stream機制確保了每個消息在其內(nèi)在的時間上下文中依然保持著嚴格的順序關(guān)系。
Redis Stream作為一個集消息持久化、多消費者公平競爭、消息追溯和排序等功能于一體的強大消息隊列工具,已在日志采集、實時數(shù)據(jù)分析、活動追蹤等諸多領(lǐng)域展現(xiàn)出卓越的適用性和價值。
@Component
public class LogCollector {
private static final String LOGS_STREAM_KEY = "logs";
private static final String GROUP_NAME = "log_consumers";
private static final String CONSUMER_NAME = "log_consumer";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 發(fā)送日志事件至 Redis Stream
public void sendLogEvent(String message, Map<String, String> attributes) {
StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
RecordId messageId = streamOperations.add(StreamRecords.newRecord()
.ofStrings(attributes)
.withStreamKey(LOGS_STREAM_KEY));
}
// 實時消費日志事件
public StreamRecords<String, String> consumeLogs(int batchSize) {
Consumer consumer = Consumer.from(CONSUMER_NAME, GROUP_NAME);
StreamOffset<String> offset = StreamOffset.create(LOGS_STREAM_KEY, ReadOffset.lastConsumed());
StreamReadOptions<String, String> readOptions = StreamReadOptions.empty().count(batchSize);
return redisTemplate.opsForStream().read(readOptions, StreamOffset.create(LOGS_STREAM_KEY, ReadOffset.lastConsumed()), consumer);
}
}
GEO類型
Redis的GEO數(shù)據(jù)類型自3.2版本起引入,專為存儲和高效操作含有經(jīng)緯度坐標的地理位置信息而設(shè)計。開發(fā)人員利用這一類型可以輕松管理地理位置數(shù)據(jù),同時兼顧內(nèi)存效率和響應(yīng)速度。
利用GEOADD命令,可以將帶有精確經(jīng)緯度坐標的數(shù)據(jù)點歸檔至指定鍵名下的集合中。
可借助GEOPOS命令獲取某一成員的具體經(jīng)緯度坐標。
通過GEODIST命令,可以準確計算任意兩個地理位置成員之間的地球表面距離,支持多種計量單位,包括米、千米、英里和英尺。
使用GEORADIUS命令,系統(tǒng)可以根據(jù)指定的經(jīng)緯度中心點及半徑范圍檢索出處于該區(qū)域內(nèi)的所有成員地理位置。
GEORADIUSBYMEMBER命令也用于范圍查詢,但其查詢依據(jù)是選定成員自身的位置,以此為圓心劃定搜索范圍。
GEO類型在許多場景下都非常有用,例如移動應(yīng)用中的附近好友查找、商店位置搜索、物流配送中的最近司機調(diào)度等。
@Service
public class FriendService {
private static final String FRIEND_LOCATIONS_KEY = "friend_locations";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private GeoOperations<String, FriendLocation> geoOperations; // 自動裝配GeoOperations
public void saveFriendLocation(FriendLocation location) {
geoOperations.add(FRIEND_LOCATIONS_KEY, location.getLongitude(), location.getLatitude(), location);
}
public List<FriendLocation> findFriendsNearby(double myLongitude, double myLatitude, Distance radius) {
Circle circle = new Circle(new Point(myLongitude, myLatitude), radius);
return geoOperations.radius(FRIEND_LOCATIONS_KEY, circle, Metric.KILOMETERS).getContent();
}
}
總結(jié)
Redis作為一款高性能、內(nèi)存型的NoSQL數(shù)據(jù)庫,憑借其豐富的數(shù)據(jù)結(jié)構(gòu)、極高的讀寫速度以及靈活的數(shù)據(jù)持久化策略,在現(xiàn)代分布式系統(tǒng)中扮演著至關(guān)重要的角色。它的關(guān)鍵價值體現(xiàn)在以下幾個方面:
- 緩存優(yōu)化:Redis將頻繁訪問的數(shù)據(jù)存儲在內(nèi)存中,顯著減少了數(shù)據(jù)庫的讀取壓力,提升了系統(tǒng)的整體性能和響應(yīng)速度。
- 分布式支持:通過主從復(fù)制、哨兵和集群模式,Redis實現(xiàn)了高度可擴展性和高可用性,滿足大規(guī)模分布式系統(tǒng)的需求。
- 數(shù)據(jù)結(jié)構(gòu)多樣性:Redis支持字符串、哈希、列表、集合、有序集合、Bitmaps、HyperLogLog、Geo等多樣化的數(shù)據(jù)結(jié)構(gòu),為多種應(yīng)用場景提供了便利,如排行榜、社交關(guān)系、消息隊列、計數(shù)器、限速器等。
- 實時處理與分析:隨著Redis 5.0引入Stream數(shù)據(jù)結(jié)構(gòu),使得Redis在日志收集、實時分析、物聯(lián)網(wǎng)數(shù)據(jù)流處理等方面有了更多的可能性。
- 地理位置服務(wù):GEO類型提供了便捷的空間索引和距離計算功能,使得Redis能夠在電商、出行、社交等領(lǐng)域提供附近地點搜索、路線規(guī)劃等服務(wù)。