位置:科技大田大数据产业专题>>资讯>>市场>>内容阅读
Redis 学习之一招击穿自己的系统,附送 N 个击穿解决大礼包 | 原力计划
原标题:Redis 学习之一招击穿自己的系统,附送 N 个击穿解决大礼包 | 原力计划

作者 | Mark_MMXI

来源 | CSDN博客,责编 | 夕颜

出品 | CSDN(ID:CSDNnews)

缓存的存在是为了在高并发情形下,缓解DB压力,提高业务系统体验。业务系统访问数据,先去缓存中进行查询,假如缓存存在数据直接返回缓存数据,否则就去查询数据库再返回值。

Redis是一种缓存工具,是一种缓存解决方案,但是引入Redis又有可能出现缓存穿透、缓存击穿、缓存雪崩等问题。本文就对缓存雪崩问题进行较深入剖析,并通过场景模型加深理解,基于场景使用对应的解决方案尝试解决。

缓存原理及Redis解决方案

首先,我们来看一下缓存的工作原理图:

Redis 本质上是一个 Key-Value 类型的内存数据库。因为是纯内存操作,Redis 的性能非常出色,每秒可以处理超过 10、万次读写操作。Redis 还有一个优势就是是支持保存多种数据结构,例如 String、List、Set、Sorted Set、hash等。

缓存雪崩

2.1 缓存雪崩解释

缓存雪崩的情况是说,当某一时刻发生大规模的缓存失效的情况,比如你的缓存服务宕机了,DB直接负载大量请求压力导致挂掉。

2.2 模拟缓存雪崩

按照缓存雪崩的解释,其实我们要模拟,只需要达到以下几个点:

  • 同一时刻大规模缓存失效。
  • 失效的时刻有大量的查询请求冲击DB
    @Test publicvoidtestQuery{ ExecutorService es = Executors.newFixedThreadPool( 10); intloop= 1000; intinit= 2000; //查询1k个key放进缓存 for( inti = init; i userService.queryById(i); } //缓存过期时间为1s,等待1s同时过期 try{ Thread.sleep( 1000); } catch(Exception e){ e.printStackTrace; } //开始了使用多线程疯狂查询 for( inti = 0; i 100; i++) { es.execute( -> { for( intk = init; k userService.queryById(k); } }); } }

    为了加快崩坏的速度,把数据库的最大连接数调整成5,同时增大数据库表的数据量达到百万级别。

    然后执行测试程序,很快程序就报错并停止,详细错误如下:

    Exceptionin thread "pool-1-thread-12" org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection is closed atorg.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74) atorg.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) atorg.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) atorg.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) atorg.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270) atorg.springframework.data.redis.connection.lettuce.LettuceStringCommands.convertLettuceAccessException(LettuceStringCommands.java:799) atorg.springframework.data.redis.connection.lettuce.LettuceStringCommands.get(LettuceStringCommands.java:68) atorg.springframework.data.redis.connection.DefaultedRedisConnection.get(DefaultedRedisConnection.java:260) atorg.springframework.data.redis.cache.DefaultRedisCacheWriter.lambda$get$1(DefaultRedisCacheWriter.java:109) atorg.springframework.data.redis.cache.DefaultRedisCacheWriter.execute(DefaultRedisCacheWriter.java:242) atorg.springframework.data.redis.cache.DefaultRedisCacheWriter.get(DefaultRedisCacheWriter.java:109) atorg.springframework.data.redis.cache.RedisCache.lookup(RedisCache.java:88) atorg.springframework.cache.support.AbstractValueAdaptingCache.get(AbstractValueAdaptingCache.java:58) atorg.springframework.cache.interceptor.AbstractCacheInvoker.doGet(AbstractCacheInvoker.java:73) atorg.springframework.cache.interceptor.CacheAspectSupport.findInCaches(CacheAspectSupport.java:554) atorg.springframework.cache.interceptor.CacheAspectSupport.findCachedItem(CacheAspectSupport.java:519) atorg.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:401) atorg.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:345) atorg.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:61) atorg.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) atorg.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747) atorg.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) atcom.example.demo.user.service.impl.UserServiceImpl$$EnhancerBySpringCGLIB$$ba6638d2.queryById() atcom.example.demo.DemoApplicationTests$1.run(DemoApplicationTests.java:55) atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) atjava.lang.Thread.run(Thread.java:748) Causedby: io.lettuce.core.RedisException: Connection is closed atio.lettuce.core.protocol.DefaultEndpoint.validateWrite(DefaultEndpoint.java:195) atio.lettuce.core.protocol.DefaultEndpoint.write(DefaultEndpoint.java:137) atio.lettuce.core.protocol.CommandExpiryWriter.write(CommandExpiryWriter.java:112) 2020-03-0822:31:14.432 ERROR 37892 --- [eate-1895102622] com.alibaba.druid.pool.DruidDataSource : create connection SQLException, url: jdbc:mysql://localhost:3306/redis_demo?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC, errorCode 1040, state 08004

    java.sql.SQLNonTransientConnectionException: Data source rejected establishment of connection, message from server: "Too many connections"

    主要问题出在数据库连接已经满了,无法获取数据库连接进行查询,这个现象是就是缓存雪崩的效果。

    2.3 解决缓存雪崩

    2.3.1 分析雪崩场景

    用图来说,实际上就是没有了redis这层担着上层流量压力

    其实从这张图来看,对于我们一般的应用,客户端去访问应用到数据库的整个链路过程,其实在面临大流量的时候,我们一般是以"倒三角"模型进行流量缓冲,什么是“倒三角”模型

    通过"倒三角"模型,按照并发需要优化系统,在面临雪崩这种情形,可以按照“倒三角”模型进行优化,注意雪崩是理论上没办法彻底解决的,可能到最终得提高硬件配置。

    2.3.1 雪崩优化方案

    经过分析得解决雪崩方案:

    1.随机缓存过期时间,能一定程度缓解雪崩2.使用锁或队列、设置过期标志更新缓存3.添加本地缓存实现多级缓存4.添加熔断降级限流,缓冲压力

    2.3.1.1 随机缓存时间

    随机缓存时间意在避免大量热点key同时失效。

    接下来,我们基于 Redis+SpringBoot+SpringCache基础项目搭建这个项目继续进行实践。

    由于是使用了SpringCache,我们最优的方案就是直接在@Cacheable等注解上面加参数,比如像表达式之类的,让数据放进缓存的时候按照表达式/参数值定义过期时间。

    因此我们先查看原有的RedisCache是怎么样的put逻辑

    RedisCacheManager创建Cache

    protectedRedisCache createRedisCache(String name, @NullableRedisCacheConfiguration cacheConfig) { returnnew RedisCache(name, this.cacheWriter, cacheConfig != null? cacheConfig : this.defaultCacheConfig); }

    打开RedisCache.class,查看put 方法如下:

    publicvoid put(Object key, @NullableObject value) { Object cacheValue = this.preProcessCacheValue(value); if(! this.isAllowNullValues && cacheValue == null) { thrownew IllegalArgumentException(String.format( "Cache '%s' does not allow 'null' values. Avoid storing null via '@Cacheable(unless="#result == null")' or configure RedisCache to allow 'null' via RedisCacheConfiguration.", this.name)); } else{ this.cacheWriter.put( this.name, this.createAndConvertCacheKey(key), this.serializeCacheValue(cacheValue), this.cacheConfig.getTtl); }}

    这里this.cacheConfig.getTtl 就是缓存的过期时间,可以看到数据的缓存过期时间是从全局缓存配置里面获取的过期时间配置的,而我需要实现的是让某个cache下每个key随机时间过期,因此我们需要改动这里 this.cacheConfig.getTtl,我们在createRedisCache的时候改变这个值就行了。

    1. 基于java动态执行字符串代码,返回过期时间。

    实现基于Spring.expression的ExpressService

    /**

    * @title: ExpressUtil * @projectNameredisdemo * @deion: 动态执行字符串代码 * @authorlps * @date2020/3/912:01 */@Slf4j publicclassExpressService{

    privateExpressionParser spelExpressionParser;

    privateParserContext parserContext; // 表达式解析上下文privateStandardEvaluationContext evaluationContext;

    publicstaticenumExpressType { /*** ${}表达式格式*/TYPE_FIRST,/*** #{}表达式格式*/TYPE_SECOND}privatestaticfinalString PRE_TYPE_1 = "${"; privatestaticfinalString PRE_TYPE_2 = "#{"; privatestaticfinalString SUF_STR = "}";

    privateExpressService(String pre, String suf){ spelExpressionParser = newSpelExpressionParser; log.debug( "表达式前缀={},表达式后缀={}", pre, suf); evaluationContext = newStandardEvaluationContext; // 增加map解析方案evaluationContext.addPropertyAccessor( newMapAccessor); parserContext = newTemplateParserContext(pre, suf); }

    /****

    * 创建表达式处理服务对象 默认为创建#{}格式表达式 通过ExpressType指定表达式格式,现有两种${}和#{}*

    *** @paramtype * 表达式格式类型* @return表达式解析对象 */publicstaticExpressService createExpressService(ExpressType type){ if(type == ExpressType.TYPE_FIRST) { log.debug( "生成表达式,表达式前缀={}", PRE_TYPE_1); returnnewExpressService(PRE_TYPE_1, SUF_STR); } else{ returnnewExpressService(PRE_TYPE_2, SUF_STR); }

    }

    publicObject expressParse(String express, Object data)throwsException { log.debug( "解析表达式信息={}", express); Expression expression = spelExpressionParser.parseExpression(express, this.parserContext); returnexpression.getValue(evaluationContext, data); }

    }

    测试调用:

    @ Test

    publicvoidtestExpress( ) { ExpressService express = ExpressService.createExpressService( null); try{ //固定超时时间System. out.println( "ttl="+express.expressParse( "#{60}", null)); //调用方法生成随机过期时间System. out.println( "ttl="+express.expressParse( "#{T(mons.lang3.RandomUtils).nextInt(60,200)}", null)); } catch(Exception e) { e.printStackTrace;}}

    2. 设计name拼接ttl规则

    由于createRedisCache只有两个参数name以及cacheConfig,而只有name是对于单个cache来说的,cacheConfig是对于全局cache来说,因此我们需要设计name参数中指定cache的name以及过期时间的规则。

    name赋值规则:name|ttlFun

    eg: @ Cacheable( cacheName=" test|#{ T( mons.lang3.RandomUtils) .nextInt(60,200)}") @ Cacheable( cacheName=" test|#{60}")

    3. 编写解析name代码

    /*** 分隔符|*/privatestaticfinal StringSEPERATE_LINE = "|"; publicMyRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) { super(cacheWriter, defaultCacheConfiguration); }

    protectedRedisCache createRedisCache( Stringname, @NullableRedisCacheConfiguration cacheConfig) { // ``name赋值规则:name|ttlFun ``if(name.contains(SEPERATE_LINE)){ StringcacheName = name.substring( 0,name.indexOf(SEPERATE_LINE)); Stringexpression = name.substring(name.indexOf(SEPERATE_LINE)+ 1); try{ ExpressService express = ExpressService.createExpressService( null); long ttl = Long.parseLong(express.expressParse(expression, null).toString); cacheConfig = cacheConfig.entryTtl(Duration.ofSeconds(ttl));returnsuper.createRedisCache(cacheName, cacheConfig);

    } catch(Exception e){ e.printStackTrace;returnsuper.createRedisCache(name, cacheConfig); }

    }returnsuper.createRedisCache(name, cacheConfig);

    }

    4. 修改CacheConfig

    将原本的RedisManager替换成#3编写的MyRedisManager

    /**

    * 配置缓存管理器*/@ BeanpublicCacheManager cacheManager( RedisConnectionFactory factory) { //关键点,spring cache 的注解使用的序列化都从这来,没有这个配置的话使用的jdk自己的序列化,实际上不影响使用,只是打印出来不适合人眼识别RedisCacheConfiguration cacheConfig = RedisCacheConfiguration.defaultCacheConfig// 将 key 序列化成字符串.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer( newStringRedisSerializer)) // 将 value 序列化成 json.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer( newGenericJackson2JsonRedisSerializer)) // 设置缓存过期时间,单位秒.entryTtl(Duration.ofSeconds(cacheExpireTime))// 不缓存空值.disableCachingNullValues;/* RedisCacheManager redisCacheManager = RedisCacheManager.builder(factory).cacheDefaults(cacheConfig).build;*///修改RedisCacheManager 为MyRedisCacheManager MyRedisCacheManager redisCacheManager = newMyRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(factory), cacheConfig); returnredisCacheManager; }

    5. 测试

    编写单元测试

    @ Test

    public void testQueryIdWithExpress{Assert.assertNotNull( userService.queryById(3333)); }

    重新定义查询的cache定义

    @Override

    @Cacheable( value = "ca1|#{60}",key = "#id", unless= "#result == null") // @Cacheable( value = "ca1|#{T(mons.lang3.RandomUtils).nextInt(100,200)}",key = "#id", unless= "#result == null") public User queryById( intid) { returnthis.userDao.queryById(id); }

    当value=ca1|#{60}的时候,通过查看Redis的TTL 剩余为58s

    当value=ca1|#{T(mons.lang3.RandomUtils).nextInt(100,200)}的时候,随机100-220范围内秒数,通过查看Redis的TTL 剩余为107s

    这时候使用random的方式就可以实现随机过期时间了,随机数最好选择符合高斯(正态)分布的会比较好。

    newRandom.nextGaussian

    2.3.1.2 互斥锁排队

    业界比价普遍的一种做法,即根据key获取value值为空时,锁上,从数据库中load数据后再释放锁。若其它线程获取锁失败,则等待一段时间后重试。这里要注意,分布式环境中要使用分布式锁,单机的话用普通的锁(synchronized、Lock)就够了。

    这样做思路比较清晰,也从一定程度上减轻数据库压力,但是锁机制使得逻辑的复杂度增加,吞吐量也降低了,有点治标不治本。

    1.使用setnx的方式设置互斥锁

    publicUser queryById( intid ) { try{ if(redisTemplate.hasKey(id+ "")) { return(User) redisTemplate.opsForValue. get(id+ ""); } else{ //获取锁if( lock(id+ "")){ // 数据库查询User user = userDao.queryById(id);redisTemplate.opsForValue. set(id+ "",user, Duration.ofSeconds( 3000)); //释放锁redisTemplate.delete(LOCK_PREFIX + "id"); }}} catch(Exception e) { e.printStackTrace;}return(User) redisTemplate.opsForValue. get( ""+id); }privatestaticString LOCK_PREFIX = "prefix"; privatestaticlongLOCK_EXPIRE = 3000; /*** 互斥锁实现*/publicboolean lock( String key) { String lock= LOCK_PREFIX + key; return(Boolean) redisTemplate.execute((RedisCallback) connection -> { longexpireAt = System.currentTimeMillis + LOCK_EXPIRE + 1; //SETNXBoolean acquire = connection.setNX( lock.getBytes, String.valueOf(expireAt).getBytes); if(acquire) { returntrue; } else{ byte[] value= connection. get( lock.getBytes); if(Objects.nonNull( value) && value.length > 0) { longexpireTime = Long.parseLong( newString( value)); //判断锁是否过期 if(expireTime byte[] oldValue = connection.getSet( lock.getBytes, String.valueOf(System.currentTimeMillis + LOCK_EXPIRE + 1).getBytes); returnLong.parseLong( newString(oldValue)) }}}returnfalse; });}

    2.3.1.3 设置过期标志更新缓存

    定时更新缓存,阻塞部分请求,达到缓冲作用,也可以设置key永不过期

    2.3.1.4 多级缓存

    这个方案主要在redis宕机,或者key在更新进缓存的中间,可以响应业务应用,减轻压力

    2.3.1.5 熔断降级限流

    这个方案是直接在业务应用之上进行请求流量控制,减轻下层压力

    【End】 返回搜狐,查看更多

    责任编辑:

    声明:该文观点仅代表作者本人,搜狐号系信息发布平台,搜狐仅提供信息存储空间服务。
    免责声明:本网站部 分文章和信息来源于互联网,本网转载出于传递更多信息和学习之目的,并不意味着赞同其观点或证实其内容的真实性。如转载稿涉及版权等问题,请立即联系管理 员,我们会予以更改或删除相关文章,保证您的权利。对使用本网站信息和服务所引起的后果,本网站不作任何承诺。
    Copyright 版权所有 Copyright 2013-2014 福建省云创集成科技服务有限公司
    All Rights Reserved. 运营维护:三明市明网网络信息技术有限公司 业务咨询:0598-8233595 0598-5831286 技术咨询:0598-8915168