_/* _ * Copyright 2013 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.util;
import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.ref.WeakReference; import java.util.Arrays; import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo; import static java.lang.Math.max; import static java.lang.Math.min;
_/**
_ * Light-weight object pool based on a thread-local stack. * * @param
**private static final** InternalLogger _logger_ \= InternalLoggerFactory._getInstance_(Recycler.**class**);
@SuppressWarnings(**"rawtypes"**)
**private static final** Handle _NOOP\_HANDLE_ \= **new** Handle() {
@Override
public void recycle(Object object) { _// NOOP _ } }; private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE); private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement(); private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; _// Use 4k instances as default. _ private static final int DEFAULT_MAX_CAPACITY_PER_THREAD; private static final int INITIAL_CAPACITY; private static final int MAX_SHARED_CAPACITY_FACTOR; private static final int MAX_DELAYED_QUEUES_PER_THREAD; private static final int LINK_CAPACITY; private static final int RATIO;
**static** {
_// In the future, we might have different maxCapacity for different object types.
_ // e.g. io.netty.recycler.maxCapacity.writeTask // io.netty.recycler.maxCapacity.outboundBuffer int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread", SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD)); if (maxCapacityPerThread < 0) { maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD; }
_DEFAULT\_MAX\_CAPACITY\_PER\_THREAD_ \= maxCapacityPerThread;
_MAX\_SHARED\_CAPACITY\_FACTOR_ \= _max_(2,
SystemPropertyUtil._getInt_(**"io.netty.recycler.maxSharedCapacityFactor"**,
2));
_MAX\_DELAYED\_QUEUES\_PER\_THREAD_ \= _max_(0,
SystemPropertyUtil._getInt_(**"io.netty.recycler.maxDelayedQueuesPerThread"**,
_// We use the same value as default EventLoop number
_ NettyRuntime.availableProcessors() * 2));
_LINK\_CAPACITY_ \= _safeFindNextPositivePowerOfTwo_(
_max_(SystemPropertyUtil._getInt_(**"io.netty.recycler.linkCapacity"**, 16), 16));
_// By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
_ // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation // bursts. RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
**if** (_logger_.isDebugEnabled()) {
**if** (_DEFAULT\_MAX\_CAPACITY\_PER\_THREAD_ \== 0) {
_logger_.debug(**"-Dio.netty.recycler.maxCapacityPerThread: disabled"**);
_logger_.debug(**"-Dio.netty.recycler.maxSharedCapacityFactor: disabled"**);
_logger_.debug(**"-Dio.netty.recycler.linkCapacity: disabled"**);
_logger_.debug(**"-Dio.netty.recycler.ratio: disabled"**);
} **else** {
_logger_.debug(**"-Dio.netty.recycler.maxCapacityPerThread: {}"**, _DEFAULT\_MAX\_CAPACITY\_PER\_THREAD_);
_logger_.debug(**"-Dio.netty.recycler.maxSharedCapacityFactor: {}"**, _MAX\_SHARED\_CAPACITY\_FACTOR_);
_logger_.debug(**"-Dio.netty.recycler.linkCapacity: {}"**, _LINK\_CAPACITY_);
_logger_.debug(**"-Dio.netty.recycler.ratio: {}"**, _RATIO_);
}
}
_INITIAL\_CAPACITY_ \= _min_(_DEFAULT\_MAX\_CAPACITY\_PER\_THREAD_, 256);
}
**private final int** **maxCapacityPerThread**;
**private final int** **maxSharedCapacityFactor**;
**private final int** **ratioMask**;
**private final int** **maxDelayedQueuesPerThread**;
**private final** FastThreadLocal<Stack<T\>> **threadLocal** \= **new** FastThreadLocal<Stack<T\>>() {
@Override
protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); }
@Override
protected void onRemoval(Stack<T> value) { _// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead _ if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } } };
**protected** Recycler() {
**this**(_DEFAULT\_MAX\_CAPACITY\_PER\_THREAD_);
}
**protected** Recycler(**int** maxCapacityPerThread) {
**this**(maxCapacityPerThread, _MAX\_SHARED\_CAPACITY\_FACTOR_);
}
**protected** Recycler(**int** maxCapacityPerThread, **int** maxSharedCapacityFactor) {
**this**(maxCapacityPerThread, maxSharedCapacityFactor, _RATIO_, _MAX\_DELAYED\_QUEUES\_PER\_THREAD_);
}
**protected** Recycler(**int** maxCapacityPerThread, **int** maxSharedCapacityFactor,
**int** ratio, **int** maxDelayedQueuesPerThread) {
**ratioMask** \= _safeFindNextPositivePowerOfTwo_(ratio) - 1;
**if** (maxCapacityPerThread <= 0) {
**this**.**maxCapacityPerThread** \= 0;
**this**.**maxSharedCapacityFactor** \= 1;
**this**.**maxDelayedQueuesPerThread** \= 0;
} **else** {
**this**.**maxCapacityPerThread** \= maxCapacityPerThread;
**this**.**maxSharedCapacityFactor** \= _max_(1, maxSharedCapacityFactor);
**this**.**maxDelayedQueuesPerThread** \= _max_(0, maxDelayedQueuesPerThread);
}
}
@SuppressWarnings(**"unchecked"**)
**public final** T get() {
**if** (**maxCapacityPerThread** \== 0) {
_//__表明没有开启池化
_ return newObject((Handle<T>) NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); _//__试图从__“__池__”__中取出一个,没有就新建一个 _ if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
_/\*\*
_ * @deprecated use {__@link _Handle#recycle(Object)}. _ */ @Deprecated public final boolean recycle(T o, Handle<T> handle) { if (handle == NOOP_HANDLE) { return false; }
DefaultHandle<T\> h = (DefaultHandle<T\>) handle;
**if** (h.**stack**.**parent** != **this**) {
**return false**;
}
h.recycle(o);
**return true**;
}
**final int** threadLocalCapacity() {
**return** **threadLocal**.get().**elements**.**length**;
}
**final int** threadLocalSize() {
**return** **threadLocal**.get().**size**;
}
**protected abstract** T newObject(Handle<T\> handle);
**public interface** Handle<T\> {
**void** recycle(T object);
}
**static final class** DefaultHandle<T\> **implements** Handle<T\> {
**private int** **lastRecycledId**;
**private int** **recycleId**;
**boolean** **hasBeenRecycled**;
**private** Stack<?> **stack**;
**private** Object **value**;
DefaultHandle(Stack<?> stack) {
**this**.**stack** \= stack;
}
@Override
public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); }
Stack<?> stack = **this**.**stack**;
**if** (**lastRecycledId** != **recycleId** || stack == **null**) {
**throw new** IllegalStateException(**"recycled already"**);
}
_//__释放用完的对象到池里面去
_ stack.push(this); } }
**private static final** FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> _DELAYED\_RECYCLED_ \=
**new** FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
@Override
protected Map<Stack>, WeakOrderQueue> initialValue() {
**return new** WeakHashMap
_// a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
_ // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain private static final class WeakOrderQueue {
**static final** WeakOrderQueue _DUMMY_ \= **new** WeakOrderQueue();
_// Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
_ @SuppressWarnings("serial") static final class Link extends AtomicInteger { private final DefaultHandle<?>[] elements = new DefaultHandle[_LINK_CAPACITY_];
**private int** **readIndex**;
Link **next**;
}
_// This act as a place holder for the head Link but also will reclaim space once finalized.
_ // Its important this does not hold any reference to either Stack or WeakOrderQueue. static final class Head { private final AtomicInteger availableSharedCapacity;
Link **link**;
Head(AtomicInteger availableSharedCapacity) {
**this**.**availableSharedCapacity** \= availableSharedCapacity;
}
_///_ _TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
_ @Override protected void finalize() throws Throwable { try { super.finalize(); } finally { Link head = link; link = null; while (head != null) { reclaimSpace(LINK_CAPACITY); Link next = head.next; _// Unlink to help GC and guard against GC nepotism. _ head.next = null; head = next; } } }
**void** reclaimSpace(**int** space) {
**assert** space >= 0;
**availableSharedCapacity**.addAndGet(space);
}
**boolean** reserveSpace(**int** space) {
**return** _reserveSpace_(**availableSharedCapacity**, space);
}
**static boolean** reserveSpace(AtomicInteger availableSharedCapacity, **int** space) {
**assert** space >= 0;
**for** (;;) {
**int** available = availableSharedCapacity.get();
**if** (available < space) {
**return false**;
}
**if** (availableSharedCapacity.compareAndSet(available, available - space)) {
**return true**;
}
}
}
}
_// chain of data items
_ private final Head head;
private Link tail;
_// pointer to another queue of delayed items for the same stack
_ private WeakOrderQueue next;
private final WeakReference
**private** WeakOrderQueue() {
**owner** \= **null**;
**head** \= **new** Head(**null**);
}
**private** WeakOrderQueue(Stack<?> stack, Thread thread) {
**tail** \= **new** Link();
_// Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
_ // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the // Stack itself GCed. head = new Head(stack.availableSharedCapacity);
head.link = tail;
owner = new WeakReference
**static** WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
**final** WeakOrderQueue queue = **new** WeakOrderQueue(stack, thread);
_// Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
_ // may be accessed while its still constructed. stack.setHead(queue);
**return** queue;
}
**private void** setNext(WeakOrderQueue next) {
**assert** next != **this**;
**this**.**next** \= next;
}
_/\*\*
_ * Allocate a new {__@link WeakOrderQueue} or return {__@code _null} if not possible. _ */ static WeakOrderQueue allocate(Stack<?> stack, Thread thread) { _// We allocated a Link so reserve the space _ return Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY) ? newQueue(stack, thread) : null; }
**void** add(DefaultHandle<?> handle) {
handle.**lastRecycledId** \= **id**;
Link tail = **this**.**tail**;
**int** writeIndex;
**if** ((writeIndex = tail.get()) == _LINK\_CAPACITY_) {
**if** (!**head**.reserveSpace(_LINK\_CAPACITY_)) {
_// Drop it.
_ return; } _// We allocate a Link so reserve the space _ this.tail = tail = tail.next = new Link();
writeIndex = tail.get();
}
tail.**elements**\[writeIndex\] = handle;
handle.**stack** \= **null**;
_// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
_ // this also means we guarantee visibility of an element in the queue if we see the index updated tail.lazySet(writeIndex + 1); }
**boolean** hasFinalData() {
**return** **tail**.**readIndex** != **tail**.get();
}
_// transfer as many items as we can from this queue to the stack, returning true if any were transferred
_ @SuppressWarnings("rawtypes") boolean transfer(Stack<?> dst) { Link head = this.head.link; if (head == null) { return false; }
**if** (head.**readIndex** \== _LINK\_CAPACITY_) {
**if** (head.**next** \== **null**) {
**return false**;
}
**this**.**head**.**link** \= head = head.**next**;
**this**.**head**.reclaimSpace(_LINK\_CAPACITY_);
}
**final int** srcStart = head.**readIndex**;
**int** srcEnd = head.get();
**final int** srcSize = srcEnd - srcStart;
**if** (srcSize == 0) {
**return false**;
}
**final int** dstSize = dst.**size**;
**final int** expectedCapacity = dstSize + srcSize;
**if** (expectedCapacity > dst.**elements**.**length**) {
**final int** actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = _min_(srcStart + actualCapacity - dstSize, srcEnd);
}
**if** (srcStart != srcEnd) {
**final** DefaultHandle\[\] srcElems = head.**elements**;
**final** DefaultHandle\[\] dstElems = dst.**elements**;
**int** newDstSize = dstSize;
**for** (**int** i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems\[i\];
**if** (element.**recycleId** \== 0) {
element.**recycleId** \= element.**lastRecycledId**;
} **else if** (element.**recycleId** != element.**lastRecycledId**) {
**throw new** IllegalStateException(**"recycled already"**);
}
srcElems\[i\] = **null**;
**if** (dst.dropHandle(element)) {
_// Drop the object.
_ continue; } element.stack = dst; dstElems[newDstSize ++] = element; }
**if** (srcEnd == _LINK\_CAPACITY_ && head.**next** != **null**) {
_// Add capacity back as the Link is GCed.
_ this.head.reclaimSpace(LINK_CAPACITY); this.head.link = head.next; }
head.**readIndex** \= srcEnd;
**if** (dst.**size** \== newDstSize) {
**return false**;
}
dst.**size** \= newDstSize;
**return true**;
} **else** {
_// The destination stack is full already.
_ return false; } } }
**static final class** Stack<T\> {
_// we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
_ // than the stack owner recycles: when we run out of items in our stack we iterate this collection // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst // still recycling all items. final Recycler<T> parent;
_// We store the Thread in a WeakReference as otherwise we may be the only ones that still hold a strong
_ // Reference to the Thread itself after it died because DefaultHandle will hold a reference to the Stack. // // The biggest issue is if we do not use a WeakReference the Thread may not be able to be collected at all if // the user will store a reference to the DefaultHandle somewhere and never clear this reference (or not clear // it in a timely manner). final WeakReference
**private final int** **maxCapacity**;
**private final int** **ratioMask**;
**private** DefaultHandle<?>\[\] **elements**;
**private int** **size**;
**private int** **handleRecycleCount** \= -1; _// Start with -1 so the first one will be recycled.
_ private WeakOrderQueue cursor, prev; private volatile WeakOrderQueue head;
Stack(Recycler<T\> parent, Thread thread, **int** maxCapacity, **int** maxSharedCapacityFactor,
**int** ratioMask, **int** maxDelayedQueues) {
**this**.**parent** \= parent;
**threadRef** \= **new** WeakReference<Thread>(thread);
**this**.**maxCapacity** \= maxCapacity;
**availableSharedCapacity** \= **new** AtomicInteger(_max_(maxCapacity / maxSharedCapacityFactor, _LINK\_CAPACITY_));
**elements** \= **new** DefaultHandle\[_min_(_INITIAL\_CAPACITY_, maxCapacity)\];
**this**.**ratioMask** \= ratioMask;
**this**.**maxDelayedQueues** \= maxDelayedQueues;
}
_// Marked as synchronized to ensure this is serialized.
_ synchronized void setHead(WeakOrderQueue queue) { queue.setNext(head); head = queue; }
**int** increaseCapacity(**int** expectedCapacity) {
**int** newCapacity = **elements**.**length**;
**int** maxCapacity = **this**.**maxCapacity**;
**do** {
newCapacity <<= 1;
} **while** (newCapacity < expectedCapacity && newCapacity < maxCapacity);
newCapacity = _min_(newCapacity, maxCapacity);
**if** (newCapacity != **elements**.**length**) {
**elements** \= Arrays._copyOf_(**elements**, newCapacity);
}
**return** newCapacity;
}
@SuppressWarnings({ **"unchecked"**, **"rawtypes"** })
DefaultHandle<T\> pop() {
**int** size = **this**.**size**;
**if** (size == 0) {
**if** (!scavenge()) {
**return null**;
}
size = **this**.**size**;
}
size --;
DefaultHandle ret = **elements**\[size\];
**elements**\[size\] = **null**;
**if** (ret.**lastRecycledId** != ret.**recycleId**) {
**throw new** IllegalStateException(**"recycled multiple times"**);
}
ret.**recycleId** \= 0;
ret.**lastRecycledId** \= 0;
**this**.**size** \= size;
**return** ret;
}
**boolean** scavenge() {
_// continue an existing scavenge, if any
_ if (scavengeSome()) { return true; }
_// reset our scavenge cursor
_ prev = null; cursor = head; return false; }
**boolean** scavengeSome() {
WeakOrderQueue prev;
WeakOrderQueue cursor = **this**.**cursor**;
**if** (cursor == **null**) {
prev = **null**;
cursor = **head**;
**if** (cursor == **null**) {
**return false**;
}
} **else** {
prev = **this**.**prev**;
}
**boolean** success = **false**;
**do** {
**if** (cursor.transfer(**this**)) {
success = **true**;
**break**;
}
WeakOrderQueue next = cursor.**next**;
**if** (cursor.**owner**.get() == **null**) {
_// If the thread associated with the queue is gone, unlink it, after
_ // performing a volatile read to confirm there is no data left to collect. // We never unlink the first queue, as we don't want to synchronize on updating the head. if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this)) { success = true; } else { break; } } }
**if** (prev != **null**) {
prev.setNext(next);
}
} **else** {
prev = cursor;
}
cursor = next;
} **while** (cursor != **null** && !success);
**this**.**prev** \= prev;
**this**.**cursor** \= cursor;
**return** success;
}
**void** push(DefaultHandle<?> item) {
Thread currentThread = Thread._currentThread_();
**if** (**threadRef**.get() == currentThread) {
_// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
_ pushNow(item); } else { _// The current Thread is not the one that belongs to the Stack _ // (or the Thread that belonged to the Stack was collected already), we need to signal that the push // happens later. pushLater(item, currentThread); } }
**private void** pushNow(DefaultHandle<?> item) {
**if** ((item.**recycleId** | item.**lastRecycledId**) != 0) {
**throw new** IllegalStateException(**"recycled already"**);
}
item.**recycleId** \= item.**lastRecycledId** \= _OWN\_THREAD\_ID_;
**int** size = **this**.**size**;
**if** (size >= **maxCapacity** || dropHandle(item)) {
_// Hit the maximum capacity or should drop - drop the possibly youngest object.
_ return; } if (size == elements.length) { elements = Arrays.copyOf(elements, min(size << 1, maxCapacity)); }
**elements**\[size\] = item;
**this**.**size** \= size + 1;
}
**private void** pushLater(DefaultHandle<?> item, Thread thread) {
_// we don't want to have a ref to the queue as the value in our weak map
_ // so we null it out; to ensure there are no races with restoring it later // we impose a memory ordering here (no-op on x86) Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(this); if (queue == null) { if (delayedRecycled.size() >= maxDelayedQueues) { _// Add a dummy queue so we know we should drop the object _ delayedRecycled.put(this, WeakOrderQueue.DUMMY); return; } _// Check if we already reached the maximum number of delayed queues and if we can allocate at all. _ if ((queue = WeakOrderQueue.allocate(this, thread)) == null) { _// drop object _ return; } delayedRecycled.put(this, queue); } else if (queue == WeakOrderQueue.DUMMY) { _// drop object _ return; }
queue.add(item);
}
**boolean** dropHandle(DefaultHandle<?> handle) {
**if** (!handle.**hasBeenRecycled**) {
**if** ((++**handleRecycleCount** & **ratioMask**) != 0) {
_// Drop the object.
_ return true; } handle.hasBeenRecycled = true; } return false; }
DefaultHandle<T\> newHandle() {
**return new** DefaultHandle<T\>(**this**);
}
}
}