一次性搞清楚,Java并发编程在各主流框架中的应用,保证看懂
Spring、Netty、Mybatis等框架的代码中大量运用了Java多线程编程技巧 。 并发编程处理的恰当与否 , 将直接影响架构的性能 。 本文通过对这些框架源码的分析 , 结合并发编程的常用技巧 , 来讲解多线程编程在这些主流框架中的应用 。
Java内存模型JVM规范定义了Java内存模型来屏蔽掉各种操作系统、虚拟机实现厂商和硬件的内存访问差异 , 以确保Java程序在所有操作系统和平台上能够达到一致的内存访问效果 。
工作内存和主内存
Java内存模型规定所有的变量都存储在主内存中 , 每个线程都有自己独立的工作内存 , 工作内存保存了对应该线程使用的变量的主内存副本拷贝 。 线程对这些变量的操作都在自己的工作内存中进行 , 不能直接操作主内存和其他工作内存中存储的变量或者变量副本 。 线程间的变量传递需通过主内存来完成 , 三者的关系如下图所示 。
2、可见性
可见性是指当一个线程修改了共享变量的值 , 其他线程能够立即得知这个修改 。 Java内存模型是通过在变量修改后将新值同步回主内存 , 在变量读取前从主内存刷新变量值这种依赖主内存作为传递媒介的方式来实现可见性的 , 无论是普通变量还是volatile变量都是如此 , 普通变量与volatile变量的区别是 , volatile的特殊规则保证了新值能立即同步到主内存 , 以及每次使用前立即从主内存刷新 。 因此 , 可以说volatile保证了多线程操作时变量的可见性 , 而普通变量则不能保证这一点 。 除了volatile外 , synchronized也提供了可见性 , synchronized的可见性是由“对一个变量执行unlock操作之前 , 必须先把此变量同步回主内存中(执行store、write操作)”这条规则获得 。
3、有序性
单线程环境下 , 程序会“有序的”执行 , 即:线程内表现为串行语义 。 但是在多线程环境下 , 由于指令重排 , 并发执行的正确性会受到影响 。 在Java中使用volatile和synchronized关键字 , 可以保证多线程执行的有序性 。 volatile通过加入内存屏障指令来禁止内存的重排序 。 synchronized通过加锁 , 保证同一时刻只有一个线程来执行同步代码 。
volatile的应用打开NioEventLoop的代码中 , 有一个控制IO操作和其他任务运行比例的 , 用volatile修饰的int类型字段ioRatio , 代码如下 。
privatevolatileintioRatio=50;这里为什么要用volatile修饰呢?我们首先对volatile关键字进行说明 , 然后再结合Netty的代码进行分析 。
关键字volatile是Java提供的最轻量级的同步机制 , Java内存模型对volatile专门定义了一些特殊的访问规则 。 下面我们就看它的规则 。 当一个变量被volatile修饰后 , 它将具备以下两种特性 。
线程可见性:当一个线程修改了被volatile修饰的变量后 , 无论是否加锁 , 其他线程都可以立即看到最新的修改 , 而普通变量却做不到这点 。 禁止指令重排序优化:普通的变量仅仅保证在该方法的执行过程中所有依赖赋值结果的地方都能获取正确的结果 , 而不能保证变量赋值操作的顺序与程序代码的执行顺序一致 。 举个简单的例子说明下指令重排序优化问题 , 代码如下:publicclassThreadStopExample{privatestaticbooleanstop;publicstaticvoidmain(String[]args)throwsInterruptedException{ThreadworkThread=newThread(newRunnable(){publicvoidrun(){inti=0;while(!stop){i++;try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedExceptione){e.printStackTrace();}}}});workThread.start();TimeUnit.SECONDS.sleep(3);stop=true;}}我们预期程序会在3s后停止 , 但是实际上它会一直执行下去 , 原因就是虚拟机对代码进行了指令重排序和优化 , 优化后的指令如下:
if(!stop)While(true)......workThread线程在执行重排序后的代码时 , 是无法发现变量stop被其它线程修改的 , 因此无法停止运行 。 要解决这个问题 , 只要将stop前增加volatile修饰符即可 。 volatile解决了如下两个问题 。 第一 , 主线程对stop的修改在workThread线程中可见 , 也就是说workThread线程立即看到了其他线程对于stop变量的修改 。 第二 , 禁止指令重排序 , 防止因为重排序导致的并发访问逻辑混乱 。
一些人认为使用volatile可以代替传统锁 , 提升并发性能 , 这个认识是错误的 。 volatile仅仅解决了可见性的问题 , 但是它并不能保证互斥性 , 也就是说多个线程并发修改某个变量时 , 依旧会产生多线程问题 。 因此 , 不能靠volatile来完全替代传统的锁 。 根据经验总结 , volatile最适用的场景是“一个线程写 , 其他线程读” , 如果有多个线程并发写操作 , 仍然需要使用锁或者线程安全的容器或者原子变量来代替 。 下面我们继续对Netty的源码做分析 。 上面讲到了ioRatio被定义成volatile , 下面看看代码为什么要这样定义 。
finallongioTime=System.nanoTime()-ioStartTime;runAllTasks(ioTime*(100-ioRatio)/ioRatio);通过代码分析我们发现 , 在NioEventLoop线程中 , ioRatio并没有被修改 , 它是只读操作 。 既然没有修改 , 为什么要定义成volatile呢?继续看代码 , 我们发现NioEventLoop提供了重新设置IO执行时间比例的公共方法 。
publicvoidsetIoRatio(intioRatio){if(ioRatio<=0||ioRatio>100){thrownewIllegalArgumentException("ioRatio:"+ioRatio+"(expected:0<ioRatio<=100)");}this.ioRatio=ioRatio;}首先 , NioEventLoop线程没有调用该set方法 , 说明调整IO执行时间比例是外部发起的操作 , 通常是由业务的线程调用该方法 , 重新设置该参数 。 这样就形成了一个线程写、一个线程读 。 根据前面针对volatile的应用总结 , 此时可以使用volatile来代替传统的synchronized关键字 , 以提升并发访问的性能 。
ThreadLocal的应用及源码解析ThreadLocal又称为线程本地存储区(ThreadLocalStorage , 简称为TLS) , 每个线程都有自己的私有的本地存储区域 , 不同线程之间彼此不能访问对方的TLS区域 。 使用ThreadLocal变量的set(Tvalue)方法可以将数据存入该线程本地存储区 , 使用get()方法可以获取到之前存入的值 。
ThreadLocal的常见应用
不使用ThreadLocal 。
publicclassSessionBean{publicstaticclassSession{privateStringid;publicStringgetId(){returnid;}publicvoidsetId(Stringid){this.id=id;}}publicSessioncreateSession(){returnnewSession();}publicvoidsetId(Sessionsession,Stringid){session.setId(id);}publicStringgetId(Sessionsession){returnsession.getId();}publicstaticvoidmain(String[]args){//没有使用ThreadLocal , 在方法间共享session需要进行session在方法间的传递newThread(()->{SessionBeanbean=newSessionBean();Sessionsession=bean.createSession();bean.setId(session,"susan");System.out.println(bean.getId(session));}).start();}}上述代码中 , session需要在方法间传递才可以修改和读取 , 保证线程中各方法操作的是一个 。 下面看一下使用ThreadLocal的代码 。
publicclassSessionBean{//定义一个静态ThreadLocal变量session , 就能够保证各个线程有自己的一份 , 并且方法可以方便获取 , 不用传递privatestaticThreadLocalsession=newThreadLocal<>();publicstaticclassSession{privateStringid;publicStringgetId(){returnid;}publicvoidsetId(Stringid){this.id=id;}}publicvoidcreateSession(){session.set(newSession());}publicvoidsetId(Stringid){session.get().setId(id);}publicStringgetId(){returnsession.get().getId();}publicstaticvoidmain(String[]args){newThread(()->{SessionBeanbean=newSessionBean();bean.createSession();bean.setId("susan");System.out.println(bean.getId());}).start();}}在方法的内部实现中 , 直接可以通过session.get()获取到当前线程的session , 省掉了参数在方法间传递的环节 。
ThreadLocal的实现原理
一般 , 类属性中的数据是多个线程共享的 , 但ThreadLocal类型的数据声明为类属性 , 却可以为每一个使用它(通过set(Tvalue)方法)的线程存储线程私有的数据 , 通过其源码我们可以发现其中的原理 。
publicclassThreadLocal{/***下面的getMap()方法传入当前线程 , 获得一个ThreadLocalMap对象 , 说明每一个线程维护了*自己的一个map , 保证读取出来的value是自己线程的 。 **ThreadLocalMap是ThreadLocal静态内部类 , 存储value的键值就是ThreadLocal本身 。 **因此可以断定 , 每个线程维护一个ThreadLocalMap的键值对映射Map 。 不同线程的Map的key值是一样的 , *都是ThreadLocal , 但value是不同的 。 */publicTget(){Threadt=Thread.currentThread();ThreadLocalMapmap=getMap(t);if(map!=null){ThreadLocalMap.Entrye=map.getEntry(this);if(e!=null){@SuppressWarnings("unchecked")Tresult=(T)e.value;returnresult;}}returnsetInitialValue();}publicvoidset(Tvalue){Threadt=Thread.currentThread();ThreadLocalMapmap=getMap(t);if(map!=null)map.set(this,value);elsecreateMap(t,value);}}ThreadLocal在Spring中的使用
Spring事务处理的设计与实现中大量使用了ThreadLocal类 , 比如 , TransactionSynchronizationManager维护了一系列的ThreadLocal变量 , 用于存储线程私有的事务属性及资源 。 源码如下 。
/***管理每个线程的资源和事务同步的中心帮助程序 。 供资源管理代码使用 , 但不供典型应用程序代码使用 。 **资源管理代码应该检查线程绑定的资源 , 如 , JDBC连接或HibernateSessions 。 *此类代码通常不应该将资源绑定到线程 , 因为这是事务管理器的职责 。 另一个选项是 , *如果事务同步处于活动状态 , 则在首次使用时延迟绑定 , 以执行跨任意数量资源的事务 。 */publicabstractclassTransactionSynchronizationManager{/***一般是一个线程持有一个独立的事务 , 以相互隔离地处理各自的事务 。 *所以这里使用了很多ThreadLocal对象 , 为每个线程绑定对应的事务属性及资源 , *以便后续使用时能直接获取 。 */privatestaticfinalThreadLocal>resources=newNamedThreadLocal>("Transactionalresources");privatestaticfinalThreadLocal>synchronizations=newNamedThreadLocal>("Transactionsynchronizations");privatestaticfinalThreadLocalcurrentTransactionName=newNamedThreadLocal("Currenttransactionname");privatestaticfinalThreadLocalcurrentTransactionReadOnly=newNamedThreadLocal("Currenttransactionread-onlystatus");privatestaticfinalThreadLocalcurrentTransactionIsolationLevel=newNamedThreadLocal("Currenttransactionisolationlevel");privatestaticfinalThreadLocalactualTransactionActive=newNamedThreadLocal("Actualtransactionactive");/***为当前线程绑定对应的resource资源*/publicstaticvoidbindResource(Objectkey,Objectvalue)throwsIllegalStateException{ObjectactualKey=TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value,"Valuemustnotbenull");Mapmap=resources.get();//如果当前线程的resources中 , 绑定的数据map为空 , 则为resources绑定mapif(map==null){map=newHashMap();resources.set(map);}ObjectoldValue=https://pcff.toutiao.jxnews.com.cn/p/20200829/map.put(actualKey,value);if(oldValueinstanceofResourceHolder&&((ResourceHolder)oldValue).isVoid()){oldValue=null;}if(oldValue!=null){thrownewIllegalStateException("Alreadyvalue["+oldValue+"]forkey["+actualKey+"]boundtothread["+Thread.currentThread().getName()+"]");}if(logger.isTraceEnabled()){logger.trace("Boundvalue["+value+"]forkey["+actualKey+"]tothread["+Thread.currentThread().getName()+"]");}}/***返回当前线程绑定的所有资源*/publicstaticMapgetResourceMap(){Mapmap=resources.get();return(map!=null?Collections.unmodifiableMap(map):Collections.emptyMap());}}ThreadLocal在Mybatis中的使用
Mybatis的SqlSession对象也是各线程私有的资源 , 所以对其的管理也使用到了ThreadLocal类 。 源码如下 。
publicclassSqlSessionManagerimplementsSqlSessionFactory,SqlSession{privatefinalThreadLocallocalSqlSession=newThreadLocal<>();publicvoidstartManagedSession(){this.localSqlSession.set(openSession());}publicvoidstartManagedSession(booleanautoCommit){this.localSqlSession.set(openSession(autoCommit));}publicvoidstartManagedSession(Connectionconnection){this.localSqlSession.set(openSession(connection));}publicvoidstartManagedSession(TransactionIsolationLevellevel){this.localSqlSession.set(openSession(level));}publicvoidstartManagedSession(ExecutorTypeexecType){this.localSqlSession.set(openSession(execType));}publicvoidstartManagedSession(ExecutorTypeexecType,booleanautoCommit){this.localSqlSession.set(openSession(execType,autoCommit));}publicvoidstartManagedSession(ExecutorTypeexecType,TransactionIsolationLevellevel){this.localSqlSession.set(openSession(execType,level));}publicvoidstartManagedSession(ExecutorTypeexecType,Connectionconnection){this.localSqlSession.set(openSession(execType,connection));}publicbooleanisManagedSessionStarted(){returnthis.localSqlSession.get()!=null;}@OverridepublicConnectiongetConnection(){finalSqlSessionsqlSession=localSqlSession.get();if(sqlSession==null){thrownewSqlSessionException("Error:Cannotgetconnection.Nomanagedsessionisstarted.");}returnsqlSession.getConnection();}@OverridepublicvoidclearCache(){finalSqlSessionsqlSession=localSqlSession.get();if(sqlSession==null){thrownewSqlSessionException("Error:Cannotclearthecache.Nomanagedsessionisstarted.");}sqlSession.clearCache();}@Overridepublicvoidcommit(){finalSqlSessionsqlSession=localSqlSession.get();if(sqlSession==null){thrownewSqlSessionException("Error:Cannotcommit.Nomanagedsessionisstarted.");}sqlSession.commit();}@Overridepublicvoidcommit(booleanforce){finalSqlSessionsqlSession=localSqlSession.get();if(sqlSession==null){thrownewSqlSessionException("Error:Cannotcommit.Nomanagedsessionisstarted.");}sqlSession.commit(force);}@Overridepublicvoidrollback(){finalSqlSessionsqlSession=localSqlSession.get();if(sqlSession==null){thrownewSqlSessionException("Error:Cannotrollback.Nomanagedsessionisstarted.");}sqlSession.rollback();}@Overridepublicvoidrollback(booleanforce){finalSqlSessionsqlSession=localSqlSession.get();if(sqlSession==null){thrownewSqlSessionException("Error:Cannotrollback.Nomanagedsessionisstarted.");}sqlSession.rollback(force);}@OverridepublicListflushStatements(){finalSqlSessionsqlSession=localSqlSession.get();if(sqlSession==null){thrownewSqlSessionException("Error:Cannotrollback.Nomanagedsessionisstarted.");}returnsqlSession.flushStatements();}@Overridepublicvoidclose(){finalSqlSessionsqlSession=localSqlSession.get();if(sqlSession==null){thrownewSqlSessionException("Error:Cannotclose.Nomanagedsessionisstarted.");}try{sqlSession.close();}finally{localSqlSession.set(null);}}}J.U.C包的实际应用线程池ThreadPoolExecutor
首先通过ThreadPoolExecutor的源码看一下线程池的主要参数及方法 。
publicclassThreadPoolExecutorextendsAbstractExecutorService{/***核心线程数*当向线程池提交一个任务时 , 若线程池已创建的线程数小于corePoolSize , 即便此时存在空闲线程 , *也会通过创建一个新线程来执行该任务 , 直到已创建的线程数大于或等于corePoolSize*/privatevolatileintcorePoolSize;/***最大线程数*当队列满了 , 且已创建的线程数小于maximumPoolSize , 则线程池会创建新的线程来执行任务 。 *另外 , 对于无界队列 , 可忽略该参数*/privatevolatileintmaximumPoolSize;/***线程存活保持时间*当线程池中线程数超出核心线程数 , 且线程的空闲时间也超过keepAliveTime时 , *那么这个线程就会被销毁 , 直到线程池中的线程数小于等于核心线程数*/privatevolatilelongkeepAliveTime;/***任务队列*用于传输和保存等待执行任务的阻塞队列*/privatefinalBlockingQueueworkQueue;/***线程工厂*用于创建新线程 。 threadFactory创建的线程也是采用newThread()方式 , threadFactory*创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号 , n为线程池中线程的编号*/privatevolatileThreadFactorythreadFactory;/***线程饱和策略*当线程池和队列都满了 , 再加入的线程会执行此策略*/privatevolatileRejectedExecutionHandlerhandler;/***构造方法提供了多种重载 , 但实际上都使用了最后一个重载完成了实例化*/publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueworkQueue){this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,Executors.defaultThreadFactory(),defaultHandler);}publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueworkQueue,ThreadFactorythreadFactory){this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,defaultHandler);}publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueworkQueue,RejectedExecutionHandlerhandler){this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,Executors.defaultThreadFactory(),handler);}publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){if(corePoolSize<0||maximumPoolSize<=0||maximumPoolSize<corePoolSize||keepAliveTime<0)thrownewIllegalArgumentException();if(workQueue==null||threadFactory==null||handler==null)thrownewNullPointerException();this.corePoolSize=corePoolSize;this.maximumPoolSize=maximumPoolSize;this.workQueue=workQueue;this.keepAliveTime=unit.toNanos(keepAliveTime);this.threadFactory=threadFactory;this.handler=handler;}/***执行一个任务 , 但没有返回值*/publicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();intc=ctl.get();if(workerCountOf(c)<corePoolSize){if(addWorker(command,true))return;c=ctl.get();}if(isRunning(c)&&workQueue.offer(command)){intrecheck=ctl.get();if(!isRunning(recheck)&&remove(command))reject(command);elseif(workerCountOf(recheck)==0)addWorker(null,false);}elseif(!addWorker(command,false))reject(command);}/***提交一个线程任务 , 有返回值 。 该方法继承自其父类AbstractExecutorService , 有多种重载 , 这是最常用的一个 。 *通过future.get()获取返回值(阻塞直到任务执行完)*/publicFuturesubmit(Callabletask){if(task==null)thrownewNullPointerException();RunnableFutureftask=newTaskFor(task);execute(ftask);returnftask;}/***关闭线程池 , 不再接收新的任务 , 但会把已有的任务执行完*/publicvoidshutdown(){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown();//hookforScheduledThreadPoolExecutor}finally{mainLock.unlock();}tryTerminate();}/***立即关闭线程池 , 已有的任务也会被抛弃*/publicListshutdownNow(){Listtasks;finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks=drainQueue();}finally{mainLock.unlock();}tryTerminate();returntasks;}publicbooleanisShutdown(){return!isRunning(ctl.get());}}线程池执行流程 , 如下图所示 。
CachedThreadPool:用来创建一个几乎可以无限扩大的线程池(最大线程数为Integer.MAX_VALUE) , 适用于执行大量短生命周期的异步任务 。 FixedThreadPool:创建一个固定大小的线程池 , 保证线程数可控 , 不会造成线程过多 , 导致系统负载更为严重 。 SingleThreadExecutor:创建一个单线程的线程池 , 可以保证任务按调用顺序执行 。 ScheduledThreadPool:适用于执行延时或者周期性任务 。如何配置线程池
CPU密集型任务尽量使用较小的线程池 , 一般为CPU核心数+1 。 因为CPU密集型任务使得CPU使用率很高 , 若开过多的线程数 , 会造成CPU过度切换 。 IO密集型任务可以使用稍大的线程池 , 一般为2*CPU核心数 。 IO密集型任务CPU使用率并不高 , 因此可以让CPU在等待IO的时候有其他线程去处理别的任务 , 充分利用CPU时间 。线程池的实际应用
Tomcat在分发web请求时使用了线程池来处理 。
BlockingQueue核心方法
publicinterfaceBlockingQueueextendsQueue{//将给定元素设置到队列中 , 如果设置成功返回true,否则返回false 。 如果是往限定了长度的队列中设置值 , 推荐使用offer()方法 。 booleanadd(Ee);//将给定的元素设置到队列中 , 如果设置成功返回true,否则返回false.e的值不能为空 , 否则抛出空指针异常 。 booleanoffer(Ee);//将元素设置到队列中 , 如果队列中没有多余的空间 , 该方法会一直阻塞 , 直到队列中有多余的空间 。 voidput(Ee)throwsInterruptedException;//将给定元素在给定的时间内设置到队列中 , 如果设置成功返回true,否则返回false.booleanoffer(Ee,longtimeout,TimeUnitunit)throwsInterruptedException;//从队列中获取值 , 如果队列中没有值 , 线程会一直阻塞 , 直到队列中有值 , 并且该方法取得了该值 。 Etake()throwsInterruptedException;//在给定的时间里 , 从队列中获取值 , 时间到了直接调用普通的poll()方法 , 为null则直接返回null 。 Epoll(longtimeout,TimeUnitunit)throwsInterruptedException;//获取队列中剩余的空间 。 intremainingCapacity();//从队列中移除指定的值 。 booleanremove(Objecto);//判断队列中是否拥有该值 。 publicbooleancontains(Objecto);//将队列中值 , 全部移除 , 并发设置到给定的集合中 。 intdrainTo(CollectionsuperE>c);//指定最多数量限制将队列中值 , 全部移除 , 并发设置到给定的集合中 。 intdrainTo(CollectionsuperE>c,intmaxElements);}主要实现类
ArrayBlockingQueue基于数组的阻塞队列实现 , 在ArrayBlockingQueue内部 , 维护了一个定长数组 , 以便缓存队列中的数据对象 , 这是一个常用的阻塞队列 , 除了一个定长数组外 , ArrayBlockingQueue内部还保存着两个整形变量 , 分别标识着队列的头部和尾部在数组中的位置 。 ArrayBlockingQueue在生产者放入数据和消费者获取数据时 , 都是共用同一个锁对象 , 由此也意味着两者无法真正并行运行 , 这点尤其不同于LinkedBlockingQueue 。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于 , 前者在插入或删除元素时不会产生或销毁任何额外的对象实例 , 而后者则会生成一个额外的Node对象 。 这在长时间内需要高效并发地处理大批量数据的系统中 , 其对于GC的影响还是存在一定的区别 。 而在创建ArrayBlockingQueue时 , 我们还可以控制对象的内部锁是否采用公平锁 , 默认采用非公平锁 。
LinkedBlockingQueue基于链表的阻塞队列 , 同ArrayListBlockingQueue类似 , 其内部也维持着一个数据缓冲队列(该队列由一个链表构成) , 当生产者往队列中放入一个数据时 , 队列会从生产者手中获取数据 , 并缓存在队列内部 , 而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值) , 才会阻塞生产者队列 , 直到消费者从队列中消费掉一份数据 , 生产者线程会被唤醒 , 反之对于消费者这端的处理也基于同样的原理 。 而LinkedBlockingQueue之所以能够高效的处理并发数据 , 还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步 , 这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据 , 以此来提高整个队列的并发性能 。 需要注意的是 , 如果构造一个LinkedBlockingQueue对象 , 而没有指定其容量大小 , LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE) , 这样的话 , 如果生产者的速度一旦大于消费者的速度 , 也许还没有等到队列满阻塞产生 , 系统内存就有可能已被消耗殆尽了 。
PriorityBlockingQueue基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定) , 但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者 , 而只会在没有可消费的数据时 , 阻塞数据的消费者 。 因此使用的时候要特别注意 , 生产者生产数据的速度绝对不能快于消费者消费数据的速度 , 否则时间一长 , 会最终耗尽所有的可用堆内存空间 。 在实现PriorityBlockingQueue时 , 内部控制线程同步的锁采用的是公平锁 。
CAS指令和原子类(应用比较多的就是计数器)【一次性搞清楚,Java并发编程在各主流框架中的应用,保证看懂】互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能的额外损耗 , 因此这种同步被称为阻塞同步 , 它属于一种悲观的并发策略 , 我们称之为悲观锁 。 随着硬件和操作系统指令集的发展和优化 , 产生了非阻塞同步 , 被称为乐观锁 。 简单地说 , 就是先进行操作 , 操作完成之后再判断操作是否成功 , 是否有并发问题 , 如果有则进行失败补偿 , 如果没有就算操作成功 , 这样就从根本上避免了同步锁的弊端 。
目前 , 在Java中应用最广泛的非阻塞同步就是CAS 。 从JDK1.5以后 , 可以使用CAS操作 , 该操作由sun.misc.Unsafe类里的compareAndSwapInt()和compareAndSwapLong()等方法实现 。 通常情况下sun.misc.Unsafe类对于开发者是不可见的 , 因此 , JDK提供了很多CAS包装类简化开发者的使用 , 如AtomicInteger 。 使用Java自带的Atomic原子类 , 可以避免同步锁带来的并发访问性能降低的问题 , 减少犯错的机会 。
全文完!
作者:yanglbme
转自自:
推荐阅读
- 萝卜做饺子馅时,到底要不要焯水?很多人都不清楚,吃完告诉家人
- 冠心病|冠心病心肌缺血不宜干什么活?能干什么活?心脏医生说的清清楚楚
- 包饺子时饺子馅总是搞不定?3个步骤教你做出美味的饺子馅
- 神农架|“神农架”深处为何不让前往?其中的古怪现象,科学家至今没搞懂
- 小学生的早餐这样吃,健康又营养足,早起半小时就搞定了,特香
- 科学家|狗到底是不是由狼驯而化来的?科学家至今没搞懂……
- 肾病|你真的了解肾病吗?清楚这4个问题,能改善预后!
- 面包不一定是烤出来的,还能直接蒸出来,全程一个小时就可以搞定
- 早餐,简单一冲就搞定,我隔三差五给孩子喝,高蛋白易吸收
- 一家三口的晚餐,半小时搞定,晒朋友圈,网友:家的味道,真香
