灵感来了挡也挡不住,基于Redis解决业务场景中延迟队列
在一次产品需求设计中 , 有这样一种场景 , 对于一个工单 , 专员可以选择将工单挂起(一次挂起2小时;二次挂起12小时) , 或者转存(表单输入转存的天数) , 然后提交工单到既定时间后 , 该工单需要重新被领单 , 然后再次审核 。 对于工单 , 有优先级概念 , 意味着优先级越大 , 该工单则需要优先被派单 , 对于这种场景 , 我们可以基于Redis来解决场景问题 。
一、业务概述我们假定设置两个队列 , 一个队列维护正式工单 , 另一个队列维护挂起工单 。 对于挂起操作 , 我们通过Redis设置key有效时间 , 当key失效时 , 客户端监听失效事件 , 获取工单 , 实现挂起工单队列的移除 , 正式队列的入队即可 。
业务流程图:
我们可以基于RedisZSet数据存储 , ZSet是个有序集合 , 可以实现基于score排序 。
2.1.1、定义上下文类(WorkOrderContext)
/***@description:工单上下文对象*@Date:2020/7/13下午4:28*@Author:石冬冬-SeigHeil*/@Data@NoArgsConstructor@AllArgsConstructor@BuilderpublicclassWorkOrderContext{/***是否测试人员专用*/privatebooleanisTest;/***工单号*/privateWorkOrderworOrder;/***队列类型*/privateQueueTypequeueType;/***创建-正式队列(立即需要被派单*@return*/publicstaticWorkOrderContextbuildImmediate(){returnWorkOrderContext.builder().queueType(QueueType.immediate).build();}/***创建-挂起队列(挂起n小时执行)*@return*/publicstaticWorkOrderContextbuildSuspended(){returnWorkOrderContext.builder().queueType(QueueType.suspended).build();}/***转存队列(转存n天后执行)*@return*/publicstaticWorkOrderContextbuildStored(){returnWorkOrderContext.builder().queueType(QueueType.stored).build();}/***创建-正式队列(立即需要被派单)**@paramworkCode*@parampriority*@return*/publicstaticWorkOrderContextbuildImmediate(StringworkCode,doublepriority){WorkOrderworkOrder=WorkOrder.builder().workCode(workCode).priority(priority).delayedTime(0).build();returnWorkOrderContext.builder().worOrder(workOrder).queueType(QueueType.immediate).build();}/***创建-挂起队列(挂起n小时执行)**@paramworkCode*@parampriority*@paramdelayedTime*@return*/publicstaticWorkOrderContextbuildSuspended(StringworkCode,doublepriority,longdelayedTime){WorkOrderworkOrder=WorkOrder.builder().workCode(workCode).priority(priority).delayedTime(delayedTime).build();returnWorkOrderContext.builder().worOrder(workOrder).queueType(QueueType.suspended).build();}/***转存队列(转存n天后执行)**@paramworkCode*@parampriority*@paramdelayedTime*@return*/publicstaticWorkOrderContextbuildStored(StringworkCode,doublepriority,longdelayedTime){WorkOrderworkOrder=WorkOrder.builder().workCode(workCode).priority(priority).delayedTime(delayedTime).build();returnWorkOrderContext.builder().worOrder(workOrder).queueType(QueueType.stored).build();}/***队列类型*/publicenumQueueType{/***正式队列(立即需要被派单)*/immediate,/***挂起队列(挂起n小时执行)*/suspended,/***转存队列(转存n天后执行)*/stored}@Data@NoArgsConstructor@AllArgsConstructor@BuilderpublicstaticclassWorkOrder{/***工单号*/privateStringworkCode;/***优先级*/privatedoublepriority;/***延迟时间*/privatelongdelayedTime;}}2.1.2、定义抽象缓存类(AbstractCacheManager)
该抽象类定义了一个方法 , 抽象定义了一个具有重试机制的方法 。 其中注入的BeanRedisService是咱们子类基于RedisAPI的封装 。
/***@description:抽象缓存管理器*@Date:2020/7/18下午9:41*@Author:石冬冬-SeigHeil*/@Slf4jpublicabstractclassAbstractCacheManager{finalintMAX_RETRIES=3;@AutowiredRedisServiceredisService;/***重试操作*@paramretries重试次数*@paramcontext上下文*@paramcall重试动作*/publicvoidretry(intretries,Tcontext,Functioncall){booleandone=false;intretry=1;do{try{done=call.apply(retry);log.info("[retry]context={},retry={},done={}",JSONObject.toJSON(context),retry,done);retry++;TimeUnit.MILLISECONDS.sleep(100);}catch(Exceptione){log.error("[retry]异常ctx={}",JSONObject.toJSON(context),e);retry++;}}while(retry<=retries&&!done);}}2.1.3、定义基于RedisCache的Manager类(WorkOrderCacheManager)
该类的主要作用 , 基于RedisString对象存储 , 实现具有Key失效机制的存储 。
内部静态类CacheValue , 作为RedisString对象存储的Value值 。 内部枚举类CacheType , 维护了缓存Key的业务前缀 。 特别说明的是 , 我们构成RedisString存储Key的命名规则 , 例如:carCarthage:stored_cache_单号 。 /***@description:工单缓存管理器*@Date:2020/7/14下午4:28*@Author:石冬冬-SeigHeil*/@Component@Slf4jpublicclassWorkOrderCacheManagerextendsAbstractCacheManager{/***设置缓存并设置缓存失效日期*@paramcache*/publicvoidsetCacheInExpire(CacheValuecache){retry(MAX_RETRIES,cache,idx->{StringredisKey=redisService.getKey(getRedisKeySuffix(cache.getType(),cache.getWorkCode()));redisService.set(redisKey,JSONObject.toJSONString(cache),cache.getExpireSeconds());log.info("[setCacheInExpire],redisKey={},CacheValue=https://pcff.toutiao.jxnews.com.cn/p/20200822/{}",redisKey,JSONObject.toJSONString(cache));returnBoolean.TRUE;});}/***查询某个工单号的缓存值*@paramcacheType缓存类型{@linkCacheType}*@paramworkCode工单号*@return*/publicCacheValueget(CacheTypecacheType,StringworkCode){StringredisKey=redisService.getKey(getRedisKeySuffix(cacheType,workCode));Stringvalue=https://pcff.toutiao.jxnews.com.cn/p/20200822/redisService.get(redisKey,String.class);returnJSONObject.parseObject(value,CacheValue.class);}/***从上下文队列类型获取队列rediskey*@paramcacheType缓存类型{@linkCacheType}*@paramworkCode工单号*@return*/StringgetRedisKeySuffix(CacheTypecacheType,StringworkCode){switch(cacheType){casestored_cache:returnCacheType.stored_cache.getKey()+workCode;casesuspended_cache:returnCacheType.suspended_cache.getKey()+workCode;default:break;}returnnull;}/***缓存值*/@Data@NoArgsConstructor@AllArgsConstructor@BuilderpublicstaticclassCacheValue{/***缓存类型*/privateCacheTypetype;/***工单号*/privateStringworkCode;/***优先级*/privatedoublepriority;/***延迟截止时间(单位:时间戳)*/privatelongdelayedTime;/***缓存失效时间(单位:秒)*/privatelongexpireSeconds;/***创建-挂起队列(挂起n小时执行)**@paramworkCode*@parampriority*@paramdelayedTime*@paramexpireSeconds*@return*/publicstaticCacheValuebuildSuspended(StringworkCode,doublepriority,longdelayedTime,longexpireSeconds){returnCacheValue.builder().type(CacheType.suspended_cache).workCode(workCode).priority(priority).delayedTime(delayedTime).expireSeconds(expireSeconds).build();}/***转存队列(转存n天后执行)**@paramworkCode*@parampriority*@paramdelayedTime*@paramexpireSeconds*@return*/publicstaticCacheValuebuildStored(StringworkCode,doublepriority,longdelayedTime,longexpireSeconds){returnCacheValue.builder().type(CacheType.stored_cache).workCode(workCode).priority(priority).delayedTime(delayedTime).expireSeconds(expireSeconds).build();}}/***实现挂起|转存缓存key*/@GetterpublicenumCacheType{stored_cache("stored_cache_"),suspended_cache("suspended_cache_"),;CacheType(Stringkey){this.key=key;}privateStringkey;}}2.1.4、工单队列管理器(WorkOrderQueueManager)
该类基于RedisZSet对象结构的有序集合 , 可以实现按照优先级出队 。
相关方法介绍:
StringgetRedisKey(WorkOrderContextcontext):从WorkOrderContext中获取队列类型 , 返回集合要存储的Key 。 LongqueueSize(WorkOrderContextcontext):返回队列大小BooleanleftPush(WorkOrderContextcontext):执行处理(入队操作)BooleanleftPushIfAbsent(WorkOrderContextcontext):执行处理(入队操作) , 如果入队元素缺席则入队 , 返回true;否则返回false 。 Longremove(WorkOrderContextcontext):从队列移除某个元素WorkOrderContext.WorkOrderpop(WorkOrderContextcontext):从集合中获取评分最小的成员出队Setrank(WorkOrderContextcontext):按照升序查看队列中所有成员LongremoveRange(Stringkey,longstart,longend):按照范围移除队列元素LongremoveValues(Stringkey,Listvalues):移除指定元素longgetDelayedTime(WorkOrderContext.QueueTypequeueType,StringworkCode):获取对应工单的延迟时间(适用于挂起和转存)/***@description:工单队列管理器*@Date:2020/7/14下午4:28*@Author:石冬冬-SeigHeil*/@Component@Slf4jpublicclassWorkOrderQueueManagerextendsAbstractCacheManager{finalStringLOCK_KEY="ZSET_ATOMIC_LOCK";@AutowiredZSetOperationszSetOperations;@AutowiredWorkOrderCacheManagerworkOrderCacheManager;/***从上下文队列类型获取队列rediskey*@paramcontext*@return*/StringgetRedisKey(WorkOrderContextcontext){StringkeySuffix=null;switch(context.getQueueType()){caseimmediate:keySuffix=CarthageConst.WorkOrderKey.IMMEDIATE_QUEUE_DEFAULT;break;casestored:keySuffix=CarthageConst.WorkOrderKey.STORED_QUEUE_DEFAULT;break;casesuspended:keySuffix=CarthageConst.WorkOrderKey.SUSPENDED_QUEUE_DEFAULT;break;default:break;}if(null!=keySuffix){if(context.isTest()){keySuffix+=CarthageConst.TEST_SUFFIX;}returnredisService.getKey(keySuffix);}returnnull;}/***返回队列大小*@paramcontext*@return*/publicLongqueueSize(WorkOrderContextcontext){returnzSetOperations.size(getRedisKey(context));}/***执行处理(入队操作)*@paramcontext*@return*/publicBooleanleftPush(WorkOrderContextcontext){StringredisKey=getRedisKey(context);StringworkCode=context.getWorOrder().getWorkCode();doublepriority=context.getWorOrder().getPriority();Booleanaction=zSetOperations.add(redisKey,workCode,priority);if(Objects.equals(Boolean.FALSE,action)){Longvalue=https://pcff.toutiao.jxnews.com.cn/p/20200822/zSetOperations.rank(redisKey,workCode);log.info("[Queue.leftPush],hasLeftPushed,action={},value=https://pcff.toutiao.jxnews.com.cn/p/20200822/{},context={}",action,value,JSONObject.toJSON(context));if(Objects.nonNull(value)){returnBoolean.TRUE;}}log.info("[Queue.leftPush]context={}",JSONObject.toJSON(context));retry(MAX_RETRIES,context,idx->action);returnOptional.ofNullable(action).orElse(Boolean.FALSE);}/***执行处理(入队操作)*如果入队元素缺席则入队 , 返回true;否则返回false 。 *@paramcontext*@return*/publicBooleanleftPushIfAbsent(WorkOrderContextcontext){StringredisKey=getRedisKey(context);StringworkCode=context.getWorOrder().getWorkCode();doublepriority=context.getWorOrder().getPriority();Booleanaction=zSetOperations.add(redisKey,workCode,priority);log.info("[WorkOrderQueue.leftPushIfAbsent,action={},context={}",action,JSONObject.toJSON(context));returnOptional.ofNullable(action).orElse(Boolean.FALSE);}/***从队列移除某个元素*@paramcontext*@return*/publicLongremove(WorkOrderContextcontext){StringredisKey=getRedisKey(context);StringworkCode=context.getWorOrder().getWorkCode();log.info("[WorkOrderQueue.remove]context={}",JSONObject.toJSON(context));Longrem=zSetOperations.remove(redisKey,workCode);Longaction=Optional.ofNullable(rem).orElse(0L);retry(MAX_RETRIES,context,idx->action.longValue()>0);returnaction;}/***从集合中获取评分最小的成员出队*@paramcontext*@return*/publicWorkOrderContext.WorkOrderpop(WorkOrderContextcontext){WorkOrderContext.WorkOrderworkOrder=null;try{StringredisKey=getRedisKey(context);//通过分布式锁 , 实现zset的zpopmin命令操作booleanlocked=redisService.lock(LOCK_KEY,5000);if(locked){//1、取出第一个最小评分元素Set>set=redisService.zSetOperations().rangeWithScores(redisKey,0,0);if(set.isEmpty()){returnnull;}//2、移除该最小评分元素Longvalue=https://pcff.toutiao.jxnews.com.cn/p/20200822/redisService.zSetOperations().removeRange(redisKey,0,0);retry(MAX_RETRIES,context,idx->value.longValue()>0);//3、返回出队成员workOrder=WorkOrderContext.WorkOrder.builder().build();for(ZSetOperations.TypedTupleeach:set){workOrder.setWorkCode(each.getValue().toString());workOrder.setPriority(each.getScore());workOrder.setDelayedTime(0);break;}}}catch(Exceptione){log.error("[WorkOrderQueue.pop]exceptionctx={}",JSONObject.toJSON(context));}finally{redisService.unlock(LOCK_KEY);}returnworkOrder;}/***按照升序查看队列中所有成员*@paramcontext*@return*/publicSetrank(WorkOrderContextcontext){Set>set=redisService.zSetOperations().rangeWithScores(getRedisKey(context),0,-1);Setmembers=Sets.newLinkedHashSetWithExpectedSize(set.size());set.forEach(each->{WorkOrderContext.WorkOrderevery=WorkOrderContext.WorkOrder.builder().workCode(each.getValue().toString()).priority(each.getScore()).delayedTime(getDelayedTime(context.getQueueType(),each.getValue().toString())).build();members.add(every);});returnmembers;}/***按照范围移除队列元素*@paramkey*@paramstart*@paramend*@return*/publicLongremoveRange(Stringkey,longstart,longend){StringredisKey=redisService.getKey(key);Longcount=zSetOperations.removeRange(redisKey,start,end);log.info("[WorkOrderQueue.removeRange]redisKey={},start={},end={},count={}",redisKey,start,end,count);returncount;}/***移除指定元素*@paramkey*@paramvalues*@return*/publicLongremoveValues(Stringkey,Listvalues){StringredisKey=redisService.getKey(key);LongAdderlongAdder=newLongAdder();values.forEach(each->{Longcount=zSetOperations.remove(redisKey,each);longAdder.add(count);});Longcount=longAdder.longValue();log.info("[WorkOrderQueue.removeValues]redisKey={},values={},count={}",redisKey,JSONObject.toJSONString(values),count);returncount;}/***获取对应工单的延迟时间(适用于挂起和转存)*@paramqueueType*@paramworkCode*@return*/longgetDelayedTime(WorkOrderContext.QueueTypequeueType,StringworkCode){longdelayedTime=0;WorkOrderCacheManager.CacheTypecacheType=null;switch(queueType){casesuspended:cacheType=WorkOrderCacheManager.CacheType.suspended_cache;break;casestored:cacheType=WorkOrderCacheManager.CacheType.stored_cache;break;default:break;}if(null!=cacheType){WorkOrderCacheManager.CacheValuecacheValue=https://pcff.toutiao.jxnews.com.cn/p/20200822/workOrderCacheManager.get(cacheType,workCode);if(null!=cacheValue){delayedTime=cacheValue.getDelayedTime();}}returndelayedTime;}}2.2、工单队列转移管理器
2.2.1、工单队列转移管理器(WorkOrderQueueTransfer)
该类实现延迟队列出队、正式队列入队的原子操作(通过Redis分布式锁实现) 。
/***@description:工单队列转移管理器*@Date:2020/7/23下午6:15*@Author:石冬冬-SeigHeil*/@Component@Slf4jpublicclassWorkOrderQueueTransferextendsAbstractCacheManager{finalstaticStringATOMIC_KEY="delayed_queue_key_expire_lock_{0}";finalstaticlongATOMIC_KEY_EXPIRE=5000;@AutowiredRedisServiceredisService;@AutowiredWorkOrderQueueManagerworkOrderQueueManager;@AutowiredWorkOrderCacheManagerworkOrderCacheManager;/***从[挂起|暂存]队列转移到正式队列中*@paramcacheType挂起|暂存*@paramdelayedContext*@return*/publicBooleantransferImmediateQueue(WorkOrderCacheManager.CacheTypecacheType,WorkOrderContextdelayedContext){booleantryLock=false;Booleandone=Boolean.FALSE;StringlockKey=null;try{WorkOrderContext.WorkOrderworkOrder=delayedContext.getWorOrder();lockKey=redisService.getKey(MessageFormat.format(ATOMIC_KEY,workOrder.getWorkCode()));tryLock=redisService.lock(lockKey,ATOMIC_KEY_EXPIRE);if(tryLock){//1、构建正式队列WorkOrderContextimmediateContext=WorkOrderContext.buildImmediate(workOrder.getWorkCode(),workOrder.getPriority());done=workOrderQueueManager.leftPushIfAbsent(immediateContext);//2、从当前延迟队列移除该元素Longcount=workOrderQueueManager.remove(delayedContext);log.info("[挂起|转存队remove],count={},delayedContext={}",count,JSONObject.toJSONString(delayedContext));}}catch(Exceptione){log.error("[transferImmediateQueue]异常,delayedContext={},cacheType={}",JSONObject.toJSONString(delayedContext),cacheType);}finally{if(Objects.nonNull(lockKey)&&tryLock){redisService.unlock(lockKey);}}returnOptional.ofNullable(done).orElse(Boolean.FALSE);}}2.3、Redis过期key监听
2.3.1、Redis过期回调监听(RedisKeyExpirationListener)
/***@description:Redis过期回调监听*@Date:2020/7/18上午10:43*@Author:石冬冬-SeigHeil*/@Component@Slf4jpublicclassRedisKeyExpirationListenerextendsKeyExpirationEventMessageListener{finalstaticStringSTORED_CACHE_KEY_PREFIX=WorkOrderCacheManager.CacheType.stored_cache.getKey();finalstaticStringSUSPENDED_CACHE_KEY_PREFIX=WorkOrderCacheManager.CacheType.suspended_cache.getKey();@AutowiredTraceLogServicetraceLogService;@AutowiredRedisServiceredisService;@AutowiredWorkOrderServiceworkOrderService;@AutowiredDelayedScheduledOperateBridgedelayedScheduledOperateBridge;publicRedisKeyExpirationListener(RedisMessageListenerContainerlistenerContainer){super(listenerContainer);}@OverridepublicvoidonMessage(Messagemessage,byte[]pattern){DatestartTime=TimeTools.createNowTime();StringexpiredKey=message.toString();StringbizPrefix=redisService.getKeyPrefix().getName();if(!expiredKey.startsWith(bizPrefix)){return;}StringcaseOfStored=redisService.getKey(STORED_CACHE_KEY_PREFIX);StringcaseOfSuspended=redisService.getKey(SUSPENDED_CACHE_KEY_PREFIX);WorkOrderCacheManager.CacheTypecacheType;WorkOrderContext.QueueTypequeueType;if(expiredKey.startsWith(caseOfStored)){queueType=WorkOrderContext.QueueType.stored;cacheType=WorkOrderCacheManager.CacheType.stored_cache;}elseif(expiredKey.startsWith(caseOfSuspended)){queueType=WorkOrderContext.QueueType.suspended;cacheType=WorkOrderCacheManager.CacheType.suspended_cache;}else{return;}StringworkCode=getWorkCode(expiredKey);log.info("监听到rediskey=[{}]已过期",expiredKey);if(Objects.nonNull(workCode)){log.info("监听到rediskey=[{}],挂起|转存工单开始处理 , workCode={}",expiredKey,workCode);WorkOrderworkOrder=workOrderService.queryOne(workCode);if(Objects.isNull(workOrder)){log.info("监听到rediskey=[{}],挂起|转存工单开始处理 , 未找到工单 , workCode={}",expiredKey,workCode);return;}WorkOrderContextdelayedContext=WorkOrderContext.builder().worOrder(WorkOrderContext.WorkOrder.builder().delayedTime(5).priority(workOrder.getCasePriority()).workCode(workOrder.getWorkCode()).build()).queueType(queueType).build();Booleandone=delayedScheduledOperateBridge.transferImmediateQueue(cacheType,delayedContext);saveTraceLog(delayedContext,done,traceLog->{JSONObjectrequestBody=newJSONObject();requestBody.put("expiredKey",expiredKey);requestBody.put("workCode",workCode);traceLog.setRequestBody(requestBody.toJSONString());traceLog.setRequestTime(startTime);});}}/***traceLog入库*@paramcontext*@paramdone*@paramconsumer*/voidsaveTraceLog(WorkOrderContextcontext,Booleandone,Consumerconsumer){try{StringhostAddress=InetAddress.getLocalHost().getHostAddress();JSONObjectresponseBody=newJSONObject();responseBody.put("workOrderContext",context);responseBody.put("transferImmediateQueue",done);TraceLogtraceLog=TraceLog.builder().appCode(context.getWorOrder().getWorkCode()).url("["+hostAddress+"]redisKeyExpirationListener.onMessage").target(this.getClass().getPackage().getName()+"."+this.getClass().getSimpleName()).responseBody(responseBody.toJSONString()).responseTime(TimeTools.createNowTime()).traceType(TraceTypeEnum.REDIS_KEY_EXPIRE.getIndex()).build();consumer.accept(traceLog);traceLogService.insertRecord(traceLog);}catch(Exceptione){log.error("saveTraceLogexception,[context={}]",JSONObject.toJSONString(context),e);}}/***从字符串截取制定的工单号*@paramvalue*@return*/StringgetWorkCode(Stringvalue){returnvalue.substring(value.lastIndexOf("_")+1);}}2.4、延迟订单既定处理桥接器
该类的主要作用 , 就是通过WorkOrderQueueTransfer实现队列元素的转移 , 同时通过OperateStrategyManager实现工单的数据库表操作 。
/***@description:延迟订单既定处理处理桥接*场景描述:挂起|转存工单到既定时间的处理*@Date:2020/7/2322:20*@Author:SeigHeil*/@Slf4j@ComponentpublicclassDelayedScheduledOperateBridge{staticfinalStringLOCK_KEY=CarthageConst.KEY_EXPIRE_LISTENER_LOCK;staticfinallongEXPIRE_SECONDS=120;@AutowiredRedisServiceredisService;@AutowiredWorkOrderQueueTransferworkOrderQueueTransfer;@AutowiredOperateStrategyManageroperateStrategyManager;/***实现从延迟队列到正式队列的转移业务处理 , 同时更新工单的状态*@paramcacheType*@paramdelayedContext*@return*/publicBooleantransferImmediateQueue(WorkOrderCacheManager.CacheTypecacheType,WorkOrderContextdelayedContext){StringworkCode=delayedContext.getWorOrder().getWorkCode();booleantryLock=false;StringredisKey=null;try{redisKey=redisService.getKey(MessageFormat.format(LOCK_KEY,workCode));tryLock=redisService.lock(redisKey,EXPIRE_SECONDS);if(!tryLock){log.info("[DelayedScheduledOperateBridge.tryLock={}获取锁失败,redisKey={}]挂起|转存既定时间处理幂等 , workCode={}",tryLock,redisKey,workCode);}if(tryLock){log.info("[DelayedScheduledOperateBridge.tryLock={}获取锁成功,redisKey={}]挂起|转存既定时间处理幂等 , workCode={}",tryLock,redisKey,workCode);Booleandone=workOrderQueueTransfer.transferImmediateQueue(cacheType,delayedContext);if(!done.booleanValue()){returnBoolean.FALSE;}OperateContextoperateContext=OperateContext.builder().operateStrategyEnum(OperateContext.OperateStrategyEnum.DELAYED_SCHEDULED_ORDER).operateParam(OperateContext.OperateParam.builder().workCode(workCode).build()).build();operateStrategyManager.execute(operateContext);log.info("[DelayedScheduledOperateBridge.transferImmediateQueue],delayedContext={},callResult={}",JSONObject.toJSONString(delayedContext),JSONObject.toJSONString(operateContext.getExecuteResult()));returnoperateContext.getExecuteResult().isSuccess();}}catch(Exceptione){log.error("[DelayedScheduledOperateBridge]挂起|转存既定时间处理异常 , workCode={},delayedContext={}",workCode,JSONObject.toJSONString(delayedContext));}finally{if(tryLock){redisService.unlock(redisKey);}}returnfalse;}}2.5、工单操作管理器
该类的主要作用 , 就是对外暴露工单操作策略类的管理 , 外部无需关注策略类的存在 , 策略类实例的创建由该类负责 。
通过OPERATE_STRATEGY_MAP维护枚举和策略类Bean的映射 。 通过init()实现OPERATE_STRATEGY_MAP容器的初始化工作 。 通过Resultexecute(OperateContextcontext)实现对外提供策略类的操作 。 /***@description:GPS工单操作策略管理类*@Date:2020/7/15下午5:43*@Author:石冬冬-SeigHeil*/@Component@Slf4jpublicclassOperateStrategyManager{staticfinalMapOPERATE_STRATEGY_MAP=Maps.newHashMapWithExpectedSize(6);@AutowiredCreateOperateStrategycreateOperateStrategy;@AutowiredAllotOrderOperateStrategyallotOrderOperateStrategy;@AutowiredSubmitWithFinishOperateStrategysubmitWithFinishOperateStrategy;@AutowiredSubmitWithStoreOperateStrategysubmitWithStoreOperateStrategy;@AutowiredSubmitWithSuspendOperateStrategysubmitWithSuspendOperateStrategy;@AutowiredDelayedScheduledOperateStrategydelayedScheduledOperateStrategy;@AutowiredAssignOrderOperateStrategyassignOrderOperateStrategy;@PostConstructprivatevoidinit(){OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.INIT_WORK_ORDER,createOperateStrategy);OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.ALLOT_WORK_ORDER,allotOrderOperateStrategy);OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.STORE_WORK_ORDER,submitWithStoreOperateStrategy);OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.SUSPEND_WORK_ORDER,submitWithSuspendOperateStrategy);OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.FINISH_WORK_ORDER,submitWithFinishOperateStrategy);OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.DELAYED_SCHEDULED_ORDER,delayedScheduledOperateStrategy);OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.ASSIGN_ORDER,assignOrderOperateStrategy);}/***对外提供对策略类的调用*@paramcontext*@return*/publicResultexecute(OperateContextcontext){StopWatchstopWatch=newStopWatch();stopWatch.start("OperateStrategyManager.execute");AbstractOperateStrategyoperateStrategy=OPERATE_STRATEGY_MAP.get(context.getOperateStrategyEnum());context.buildExecuteResultWithSuccess();operateStrategy.execute(context);ResultexecuteResult=context.getExecuteResult();if(context.getExecuteResult().isSuccess()){returnResult.suc(executeResult.getMsg());}stopWatch.stop();longspendMillSeconds=stopWatch.getLastTaskTimeMillis();longduration=(System.currentTimeMillis()-spendMillSeconds)/1000;StringexecuteResultMsg=executeResult.getMsg();log.info("[execute]done,duration={},executeResultMsg={}",duration,executeResultMsg);returnResult.fail(RemoteEnum.FAILURE,executeResultMsg);}}2.6、工单策略类实现
由于工单涉及到创建、挂起、转存、处理完结等操作 , 因此对于这类场景我们可以通过策略类来实现 。
2.6.1、挂起操作
/***@description:提交调查结果(挂起操作)-策略类*@Date:2020/7/15下午5:32*@Author:石冬冬-SeigHeil*/@Slf4j@ComponentpublicclassSubmitWithSuspendOperateStrategyextendsAbstractSubmitOperateStrategy{staticfinalMapsuspend_to_attention_event_map=newHashMap<>();staticfinalMapsuspend_to_sub_status_map=newHashMap<>();staticfinalMapsuspend_count_map=newHashMap<>();static{suspend_to_attention_event_map.put(MoveToEnum.SUSPENDED_AT_ONCE,AttentionEventEnum.SUSPENDED_AT_ONCE);suspend_to_attention_event_map.put(MoveToEnum.SUSPENDED_AT_TWICE,AttentionEventEnum.SUSPENDED_AT_TWICE);suspend_to_sub_status_map.put(MoveToEnum.SUSPENDED_AT_ONCE,WorkOrderStatusEnum.SubStatusEnum.SUSPENDED_AT_ONCE);suspend_to_sub_status_map.put(MoveToEnum.SUSPENDED_AT_TWICE,WorkOrderStatusEnum.SubStatusEnum.SUSPENDED_AT_TWICE);suspend_count_map.put(MoveToEnum.SUSPENDED_AT_ONCE,1);suspend_count_map.put(MoveToEnum.SUSPENDED_AT_TWICE,2);log.info("init...suspend_to_attention_event_map={}",suspend_to_attention_event_map.toString());log.info("init...suspend_to_sub_status_map={}",suspend_to_sub_status_map.toString());log.info("init...suspend_count_map={}",suspend_count_map.toString());}@AutowiredDiamondConfigProxydiamondConfigProxy;@Overridepublicvoidprepare(OperateContextcontext){super.prepare(context);SurveyResultsurveyResult=context.getSurveyResult();MoveToEnummoveToEnum=MoveToEnum.getByIndex(surveyResult.getMoveTo());AttentionEventattentionEvent=suspend_to_attention_event_map.getOrDefault(moveToEnum,null);ATTENTION_EVENT_CONTEXT.set(attentionEvent);context.setAttentionEvent(attentionEvent);}@OverrideWorkOrderbuildWorkOrder(OperateContextcontext){SurveyResultsurveyResult=context.getSurveyResult();MoveToEnummoveToEnum=MoveToEnum.getByIndex(surveyResult.getMoveTo());WorkOrderworkOrder=super.buildWorkOrder(context);workOrder.setSuspendedCount(suspend_count_map.getOrDefault(moveToEnum,0).intValue());workOrder.setMainStatus(WorkOrderStatusEnum.WAITING.getIndex());workOrder.setSubStatus(suspend_to_sub_status_map.get(moveToEnum).getIndex());workOrder.setIsFinished(Const.NON_INDEX);workOrder.setIsStore(Const.NON_INDEX);workOrder.setDelayedTime(context.getOperateParam().getDelayedTime());returnworkOrder;}@OverridevoidoperationExtend(OperateContextcontext){longdelayedTime=context.getOperateParam().getDelayedTime().getTime();intdelayedSeconds=context.getOperateParam().getDelayedSeconds();WorkOrderworkOrder=context.getWorkOrder();WorkOrderContextcxt=WorkOrderContext.buildSuspended(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime);workOrderQueueManager.leftPush(cxt);WorkOrderCacheManager.CacheValuecacheValue=https://pcff.toutiao.jxnews.com.cn/p/20200822/WorkOrderCacheManager.CacheValue.buildSuspended(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime,delayedSeconds);workOrderCacheManager.setCacheInExpire(cacheValue);super.operationExtend(context);}@OverridepublicvoidsetDelayedTime(OperateContextcontext){SurveyResultsurveyResult=context.getSurveyResult();MoveToEnummoveToEnum=MoveToEnum.getByIndex(surveyResult.getMoveTo());DiamondConfig.SuspendOrderConfigsuspendOrderConfig=diamondConfigProxy.suspendOrderConfig();DatedelayedTime=TimeTools.createNowTime();inttimeUnit=Calendar.HOUR_OF_DAY;intdelayedSeconds=0;intvalue=suspendOrderConfig.getConfig().getOrDefault(moveToEnum.name(),0);switch(suspendOrderConfig.getTimeUnit()){case"DAY":timeUnit=Calendar.DAY_OF_YEAR;delayedSeconds=value*24*3600;break;case"HOUR":timeUnit=Calendar.HOUR_OF_DAY;delayedSeconds=value*3600;break;case"MINUTE":timeUnit=Calendar.MINUTE;delayedSeconds=value*60;break;case"SECOND":timeUnit=Calendar.SECOND;delayedSeconds=value;break;default:break;}TimeTools.addTimeField(delayedTime,timeUnit,value);context.getOperateParam().setDelayedTime(delayedTime);context.getOperateParam().setDelayedSeconds(delayedSeconds);}}2.6.2、转存操作
/***@description:提交调查结果(转存操作)-策略类*@Date:2020/7/15下午5:32*@Author:石冬冬-SeigHeil*/@Slf4j@ComponentpublicclassSubmitWithStoreOperateStrategyextendsAbstractSubmitOperateStrategy{/***转存天数换算秒数*/staticfinalintDAY_TO_SECONDS=24*60*60;@Overridepublicvoidprepare(OperateContextcontext){ATTENTION_EVENT_CONTEXT.set(AttentionEventEnum.STORE_ORDER);context.setAttentionEvent(AttentionEventEnum.STORE_ORDER);super.prepare(context);}@OverridepublicbooleanparamCheck(OperateContextcontext){if(Objects.isNull(context.getSurveyResult().getDelayedDays())){context.buildExecuteResultWithFailure("[surveyResult.delayedDays]为空!");}if(context.getSurveyResult().getDelayedDays()==0){context.buildExecuteResultWithFailure("等待天数[delayedDays]必须大于0!");}returnsuper.paramCheck(context);}@OverrideWorkOrderbuildWorkOrder(OperateContextcontext){WorkOrderworkOrder=super.buildWorkOrder(context);workOrder.setMainStatus(WorkOrderStatusEnum.PENDING.getIndex());workOrder.setSubStatus(WorkOrderStatusEnum.SubStatusEnum.STORED.getIndex());workOrder.setIsFinished(Const.NON_INDEX);workOrder.setIsStore(Const.YES_INDEX);//setSuspendedCount这里需要重置为0 , 转存后派单流程状态依赖该字段workOrder.setSuspendedCount(0);workOrder.setDelayedTime(context.getOperateParam().getDelayedTime());returnworkOrder;}@OverridevoidoperationExtend(OperateContextcontext){longdelayedTime=context.getOperateParam().getDelayedTime().getTime();intdelayedSeconds=context.getOperateParam().getDelayedSeconds();WorkOrderworkOrder=context.getWorkOrder();WorkOrderContextcxt=WorkOrderContext.buildStored(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime);workOrderQueueManager.leftPush(cxt);WorkOrderCacheManager.CacheValuecacheValue=https://pcff.toutiao.jxnews.com.cn/p/20200822/WorkOrderCacheManager.CacheValue.buildStored(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime,delayedSeconds);workOrderCacheManager.setCacheInExpire(cacheValue);super.operationExtend(context);}@OverridepublicvoidsetDelayedTime(OperateContextcontext){intdelayedDays=context.getSurveyResult().getDelayedDays();DatedelayedTime=TimeTools.createNowTime();TimeTools.addTimeField(delayedTime,Calendar.DAY_OF_YEAR,delayedDays);context.getOperateParam().setDelayedTime(delayedTime);context.getOperateParam().setDelayedSeconds(delayedDays*DAY_TO_SECONDS);}}三、交互UI介绍系统链路日志
四、总结在接到该需求后 , 尽管这个只是其中需求的一小部分 , 整个产品需求33页 , 内容还是丰富的 , 对于延迟队列操作 , 我在调研技术方案时 , 也考虑过其他方法 , 譬如java自带的带有延迟特性的队列DelayedQueue(对于分布式多实例场景它就不适合了) , 以及可以通过RabbitMQ实现(感觉实现比较复杂) , 最终选择了Redis(可以利用相关数据特性比如ZSet,String,Expire) , 技术实现比较简单上手 。
作者:秋夜无霜
链接:
【灵感来了挡也挡不住,基于Redis解决业务场景中延迟队列】来源:掘金
推荐阅读
- 喝酒|长期喝酒者,早起后,若有这5个表现,你得考虑戒酒保肝了!
- 百度|AI公司百度能给港交所带来什么?
- 浪胃仙|泡泡龙的离世给所有吃播提了醒,浪胃仙顺势决定“转行”,新职业认真的吗?
- 从小就馋此口,比肉香多了,几块钱做一大盘,咋吃都不腻
- 别再买坚果零食吃了,自己在家就能做,酥脆香甜,没有一点苦涩味!
- 这早餐我从3岁开始吃,三十多年了,从没吃腻过,晶莹剔透很好吃
- 剩米饭别再炒了,试试这样做,比蛋炒饭好吃一百倍
- 猪蹄肥而不腻,能够给粥带来肉香,满满的胶原蛋白,幸福感倍增
- 春天来了教你几道好吃不油腻的家常菜,美味简单下饭,百吃不厌!
- 天冷了,怎能不吃此菜?十分钟就上桌,香辣可口还美味,贼香
