1. Recycler对象池
Recycler抽象类的实现非常简单,只有三个方法:
获取对象:Recycler:get()
回收对象:Recycler:recycle()
创建对象:Recycler:newObject()
newObject为抽象方法,需要由实现类自己实现此方法来创建对象。
Recycler对象池目的是尽可能的重复利用同一线程创建的对象,同时为了避免占用过多的内存,回收对象时采用一定的比例回收对象(默认1/7规则 注释中有解释),未回收的对象由jvm垃圾回收机制来处理。
Recycler对象池的数据存储结构如下:
和创建stack相同的线程回收对象时存储在elements数组(pushNow方法回收),如果不是同一个线程则放入WeakOrderQueue队列中,等到需要get对象时且stack为空,会将WeakOrderQueue集合的所有元素根据一定的规则转移到stack的elements数组中。
2. 对象的获取
首先判断池中是否存在对象,如果由则优先从本地线程stack中获取Object,如果stack为空时,再将其他线程queue集合的所有对象根据一定的规则转移到本地线程stack中(1/7规则),然后再从stack获取Object并返回.
如果池中不存在对象,创建对象并返回。
如果池中不存在对象,创建对象并返回。
相关代码如下: View Code
???public final T get() { ???????if (maxCapacityPerThread == 0) { ???????????return newObject((Handle<T>) NOOP_HANDLE); ???????} ???????Stack<T> stack = threadLocal.get(); ???????DefaultHandle<T> handle = stack.pop();//首先判断池中是否存在对象,如果由则优先从本地线程stack中获取Object,如果stack为空时,再将其他线程queue集合的所有对象根据一定的规则转移到本地线程stack中(1/7规则),然后再从stack获取Object并返回. ???????if (handle == null) {//如果池中不存在对象,创建对象并返回。 ???????????handle = stack.newHandle(); ???????????handle.value = newObject(handle); ???????} ???????// ???????System.out.println(threadLocal.getClass() + "=" + stack + "=" + handle); ???????return (T) handle.value; ???} ???DefaultHandle<T> pop() { ???????int size = this.size; ???????if (size == 0) {首先判断池中是否存在对象// ???????????if (!scavenge()) {//如果stack为空时,再将其他线程queue集合的所有对象根据一定的规则转移到本地线程stack中(1/7规则) ???????????????return null; ???????????} ???????????size = this.size; ???????} ???????size--; ???????DefaultHandle ret = elements[size]; ???????elements[size] = null; ???????if (ret.lastRecycledId != ret.recycleId) { ???????????throw new IllegalStateException("recycled multiple times"); ???????} ???????ret.recycleId = 0; ???????ret.lastRecycledId = 0; ???????this.size = size; ???????return ret; ???} ???????boolean scavenge() { ???????// continue an existing scavenge, if any ???????if (scavengeSome()) { ???????????return true; ???????} ???????// reset our scavenge cursor ???????prev = null; ???????cursor = head; ???????return false; ???} ???boolean scavengeSome() { ???????WeakOrderQueue prev; ???????WeakOrderQueue cursor = this.cursor; ???????if (cursor == null) { ???????????prev = null; ???????????cursor = head; ???????????if (cursor == null) { ???????????????return false; ???????????} ???????} else { ???????????prev = this.prev; ???????} ???????boolean success = false; ???????do { ???????????if (cursor.transfer(this)) {//将其他线程queue集合的所有对象根据一定的规则转移到本地线程stack中(1/7规则) ???????????????success = true; ???????????????break; ???????????} ???????????WeakOrderQueue next = cursor.next; ???????????if (cursor.owner.get() == null) { ???????????????// If the thread associated with the queue is gone, unlink it, after ???????????????// performing a volatile read to confirm there is no data left to collect. ???????????????// We never unlink the first queue, as we don‘t want to synchronize on updating the head. ???????????????if (cursor.hasFinalData()) { ???????????????????for (;;) { ???????????????????????if (cursor.transfer(this)) { ???????????????????????????success = true; ???????????????????????} else { ???????????????????????????break; ???????????????????????} ???????????????????} ???????????????} ???????????????if (prev != null) { ???????????????????prev.setNext(next); ???????????????} ???????????} else { ???????????????prev = cursor; ???????????} ???????????cursor = next; ???????} while (cursor != null && !success);//遍历所有的WeakOrderQueue ???????this.prev = prev; ???????this.cursor = cursor; ???????return success; ???} ???boolean transfer(Stack<?> dst) { ???????Link head = this.head.link; ???????if (head == null) { ???????????return false; ???????} ???????if (head.readIndex == LINK_CAPACITY) { ???????????if (head.next == null) { ???????????????return false; ???????????} ???????????this.head.link = head = head.next; ???????} ???????final int srcStart = head.readIndex; ???????int srcEnd = head.get(); ???????final int srcSize = srcEnd - srcStart; ???????if (srcSize == 0) { ???????????return false; ???????} ???????final int dstSize = dst.size; ???????final int expectedCapacity = dstSize + srcSize; ???????if (expectedCapacity > dst.elements.length) { ???????????final int actualCapacity = dst.increaseCapacity(expectedCapacity); ???????????srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd); ???????} ???????if (srcStart != srcEnd) { ???????????final DefaultHandle[] srcElems = head.elements; ???????????final DefaultHandle[] dstElems = dst.elements; ???????????int newDstSize = dstSize; ???????????for (int i = srcStart; i < srcEnd; i++) {//将queue中所有元素按照(1/7规则)保存到stack中 ???????????????DefaultHandle element = srcElems[i]; ???????????????if (element.recycleId == 0) { ???????????????????element.recycleId = element.lastRecycledId; ???????????????} else if (element.recycleId != element.lastRecycledId) { ???????????????????throw new IllegalStateException("recycled already"); ???????????????} ???????????????srcElems[i] = null; ???????????????if (dst.dropHandle(element)) { ???????????????????// Drop the object. ???????????????????continue; ???????????????} ???????????????element.stack = dst; ???????????????dstElems[newDstSize++] = element; ???????????} ???????????if (srcEnd == LINK_CAPACITY && head.next != null) { ???????????????// Add capacity back as the Link is GCed. ???????????????this.head.reclaimSpace(LINK_CAPACITY); ???????????????this.head.link = head.next; ???????????} ???????????head.readIndex = srcEnd; ???????????if (dst.size == newDstSize) { ???????????????return false; ???????????} ???????????dst.size = newDstSize; ???????????return true; ???????} else { ???????????// The destination stack is full already. ???????????return false; ???????} ???} ???????
3. 回收对象
如果当前回收的线程是原始线程(创建对象的线程),那么调用pushNow,如果超过Stack的容量(默认4*1024)或者1/7规则 就drop(由jvm回收释放)
如果当前回收的线程不是原始线程,那么调用pushLater,如果当前线程是第一次回收原始线程的对象,需要由当前线程创建的WeakOrderQueue(原始线程的stack对象中可以关联其他线程的WeakOrderQueue)。如果创建队列成功,则进入该队列;失败(queue个数大于maxDelayedQueues)则放弃该对象(由jvm回收释放)
如果当前回收的线程不是原始线程,那么调用pushLater,如果当前线程是第一次回收原始线程的对象,需要由当前线程创建的WeakOrderQueue(原始线程的stack对象中可以关联其他线程的WeakOrderQueue)。如果创建队列成功,则进入该队列;失败(queue个数大于maxDelayedQueues)则放弃该对象(由jvm回收释放)
注释:1)由本地线程的WeakOrderQueue回收对象,这么做的原因时避免并发(races竞争),而且不是每个对象都有对象回收池来回收,如果超过最大容量限制会放弃,而且本地线程stack采用1/8规则措施,而且将异地线程queue的对象转移到本地线程stack时也采取1/8规则措施
2)1/7规则是指每7个对象只留取1个回收,剩余部分放弃,如1-7个回收1个,8-15回收2个......
回收对象的相关代码如下:
回收对象的相关代码如下:
(1)DefaultHandle类 ???????@Override ???????public void recycle(Object object) { ???????????if (object != value) { ???????????????throw new IllegalArgumentException("object does not belong to handle"); ???????????} ???????????stack.push(this); ???????} ???????????(2)Stack类 ???????void push(DefaultHandle<?> item) { ???????Thread currentThread = Thread.currentThread(); ???????if (threadRef.get() == currentThread) { ???????????// The current Thread is the thread that belongs to the Stack, we can try to push the object now. ???????????pushNow(item); ???????} else { ???????????// The current Thread is not the one that belongs to the Stack ???????????// (or the Thread that belonged to the Stack was collected already), we need to signal that the push ???????????// happens later. ???????????pushLater(item, currentThread); ???????} ??????????(3)private void pushNow(DefaultHandle<?> item) { ???????????if ((item.recycleId | item.lastRecycledId) != 0) { ???????????????throw new IllegalStateException("recycled already"); ???????????} ???????????item.recycleId = item.lastRecycledId = OWN_THREAD_ID; ???????????int size = this.size; ???????????if (size >= maxCapacity || dropHandle(item)) {//为了避免回收对象数量太多,占用太多内存,采取了1/8规则措施,参数可调 ???????????????// Hit the maximum capacity or should drop - drop the possibly youngest object. ???????????????return; ???????????} ???????????if (size == elements.length) { ???????????????elements = Arrays.copyOf(elements, min(size << 1, maxCapacity)); ???????????} ???????????elements[size] = item; ???????????this.size = size + 1; ???????} ???????????????(4)private void pushLater(DefaultHandle<?> item, Thread thread) { ???????????// we don‘t want to have a ref to the queue as the value in our weak map ???????????// so we null it out; to ensure there are no races with restoring it later ???????????// we impose a memory ordering here (no-op on x86) ???????????Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); ???????????WeakOrderQueue queue = delayedRecycled.get(this); ???????????if (queue == null) { ???????????????if (delayedRecycled.size() >= maxDelayedQueues) {回收池的异地队列个数有限制,每个队列的容量也有限制 ???????????????????// Add a dummy queue so we know we should drop the object ???????????????????delayedRecycled.put(this, WeakOrderQueue.DUMMY); ???????????????????return; ???????????????} ???????????????// Check if we already reached the maximum number of delayed queues and if we can allocate at all. ???????????????if ((queue = WeakOrderQueue.allocate(this, thread)) == null) { ???????????????????// drop object ???????????????????return; ???????????????} ???????????????delayedRecycled.put(this, queue); ???????????} else if (queue == WeakOrderQueue.DUMMY) { ???????????????// drop object ???????????????return; ???????????} ???????????queue.add(item); ???????} ???????(5)WeakOrderQueue类 ???????void add(DefaultHandle<?> handle) { ???????????handle.lastRecycledId = id; ???????????Link tail = this.tail; ???????????int writeIndex; ???????????if ((writeIndex = tail.get()) == LINK_CAPACITY) { ???????????????if (!head.reserveSpace(LINK_CAPACITY)) { ???????????????????// Drop it. ???????????????????return; ???????????????} ???????????????// We allocate a Link so reserve the space ???????????????this.tail = tail = tail.next = new Link(); ???????????????writeIndex = tail.get(); ???????????} ???????????tail.elements[writeIndex] = handle; ???????????handle.stack = null; ???????????// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread; ???????????// this also means we guarantee visibility of an element in the queue if we see the index updated ???????????tail.lazySet(writeIndex + 1); ???????}
netty对象池使用与回收
原文地址:https://www.cnblogs.com/wangshifa666/p/9565237.html