/** * The synchronization state. */ privatevolatileint state;
/** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protectedfinalintgetState() { return state; }
/** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protectedfinalvoidsetState(int newState) { state = newState; }
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protectedfinalbooleancompareAndSetState(int expect, int update) { return U.compareAndSetInt(this, STATE, expect, update); }
// Node status bits, also used as argument and return values staticfinalintWAITING=1; // must be 1 staticfinalintCANCELLED=0x80000000; // must be negative staticfinalintCOND=2; // in a condition wait
abstractstaticclassNode { volatile Node prev; // initially attached via casTail volatile Node next; // visibly nonnull when signallable Thread waiter; // visibly nonnull when enqueued volatileint status; // written by owner, atomic bit ops by others // 略 }
/** * Head of the wait queue, lazily initialized. */ privatetransientvolatile Node head;
/** * Tail of the wait queue. After initialization, modified only via casTail. */ privatetransientvolatile Node tail;
/** * Enqueues the node unless null. (Currently used only for * ConditionNodes; other cases are interleaved with acquires.) */ finalvoidenqueue(Node node) { if (node != null) { for (;;) { Nodet= tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) // initialize tryInitializeHead(); elseif (casTail(t, node)) { t.next = node; if (t.status < 0) // wake up to clean link LockSupport.unpark(node.waiter); break; } } } }
Semaphore
Semaphore 就是 AQS 的一个实现,从它的源码就能很容易看出来,它内部就是通过 AQS 的 state 来管理 permits。
/** * Fair version */ staticfinalclassFairSyncextendsSync { privatestaticfinallongserialVersionUID=2014338818796000944L;
FairSync(intpermits) { super(permits); }
protectedinttryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; intavailable= getState(); intremaining= available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
/** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. */ publicSemaphore(intpermits) { sync = newNonfairSync(permits); }
/** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee * first-in first-out granting of permits under contention, * else {@code false} */ publicSemaphore(intpermits, boolean fair) { sync = fair ? newFairSync(permits) : newNonfairSync(permits); } }