AQS(AbstractQueuedSynchronizer)框架之ReentrantLock

park 与 unPark 使用

ReentrantLock的实现使用的就是 park + 自旋的方式 ,下面举个例子来了解下 park 和 unpark 方法

public static void main(String[] args) throws InterruptedException {    log.debug("1");    Thread t1 = new Thread(()->{        log.debug("2");        LockSupport.park();//让当前线程阻塞        log.debug("4");    });    //告诉cpu t1 当前可调度;具体什么时候调度是由操作系统决定的    t1.start();    //让主线程睡眠2秒    TimeUnit.SECONDS.sleep(2);    log.debug("3");    LockSupport.unpark(t1);//唤醒t1线程}

打印结果

手写原始方式实现锁

锁,其实就是一个标识,当这个标识改变成了某个状态我们理解为获得锁

public class CustomerLock {    // status = 1;  不是原子性的,这是一条java代码 编译后会分为三条指令 gets=0  set1cache=1 set2    // 执行set1时 只是在当前线程操作 将status改为1 此时只是在内存中操作,还未同步到主存当中。    // 就是说线程t1修改status值还未同步写入主存时 t2也执行了相同操作,此时t2拿到的status还是0    volatile int status = 0;    //实例化这个类 ①为了调用cas方法 ②获取status变量的偏移量    private static Unsafe unsafe = null;    //CustomerLock当中status变量的内存偏移量    private static long statusOffset;    //获取 Unsafe对象    static {        Field singleOneInstanceField = null;        try{            singleOneInstanceField = Unsafe.class.getDeclaredField("theUnsafe");            singleOneInstanceField.setAccessible(true);            unsafe = (Unsafe) singleOneInstanceField.get(null);            statusOffset = unsafe.objectFieldOffset(                    com.zld.cloud.CustomerLock.class.getDeclaredField("status"));        }catch (Exception e){            e.printStackTrace();        }    }    void lock() throws InterruptedException{        while (!compareAndSet(0,1)){            TimeUnit.SECONDS.sleep(5);        }    }    void unLock(){        status = 0;    }    private boolean compareAndSet(int oldVal, int newVal) {        return unsafe.compareAndSwapInt(this,statusOffset,0,1);    }}
public static void main(String[] args) throws InterruptedException {    CustomerLock customerLock = new CustomerLock();    Thread t1 = new Thread(()->{        try {            customerLock.lock();        } catch (InterruptedException e) {            throw new RuntimeException(e);        }        log.debug("1");        log.debug("1");        log.debug("1");        log.debug("1");        log.debug("1");        customerLock.unLock();    },"t1");    Thread t2 = new Thread(()->{        try {            customerLock.lock();        } catch (InterruptedException e) {            throw new RuntimeException(e);        }        log.debug("2");        log.debug("2");        log.debug("2");        log.debug("2");        log.debug("2");        customerLock.unLock();    },"t2");    t1.start();    t2.start();}

测试结果截图

ReentrantLock 源码分析

public class ReentrantLock implements Lock, java.io.Serializable {    private static final long serialVersionUID = 7373984872572414699L;    /** Synchronizer providing all implementation mechanics */    private final Sync sync;    ...   }

ReentrantLock 中有个 抽象类Sync,这个类有两个实现 FairSync 公平锁 和 NonfairSync 非公平锁

abstract static class Sync extends AbstractQueuedSynchronizer {    private static final long serialVersionUID = -5179523762034025860L;    /**     * Performs {@link Lock#lock}. The main reason for subclassing     * is to allow fast path for nonfair version.     */    abstract void lock();    /**     * Performs non-fair tryLock.  tryAcquire is implemented in     * subclasses, but both need nonfair try for trylock method.     */    final boolean nonfairTryAcquire(int acquires) {        final Thread current = Thread.currentThread();        int c = getState();        if (c == 0) {            if (compareAndSetState(0, acquires)) {                setExclusiveOwnerThread(current);                return true;            }        }        else if (current == getExclusiveOwnerThread()) {            int nextc = c + acquires;            if (nextc < 0) // overflow                throw new Error("Maximum lock count exceeded");            setState(nextc);            return true;        }        return false;    }    protected final boolean tryRelease(int releases) {        int c = getState() - releases;        if (Thread.currentThread() != getExclusiveOwnerThread())            throw new IllegalMonitorStateException();        boolean free = false;        if (c == 0) {            free = true;            setExclusiveOwnerThread(null);        }        setState(c);        return free;    }    protected final boolean isHeldExclusively() {        // While we must in general read state before owner,        // we don't need to do so to check if current thread is owner        return getExclusiveOwnerThread() == Thread.currentThread();    }    final ConditionObject newCondition() {        return new ConditionObject();    }    // Methods relayed from outer class    final Thread getOwner() {        return getState() == 0 ? null : getExclusiveOwnerThread();    }    final int getHoldCount() {        return isHeldExclusively() ? getState() : 0;    }    final boolean isLocked() {        return getState() != 0;    }    /**     * Reconstitutes the instance from a stream (that is, deserializes it).     */    private void readObject(java.io.ObjectInputStream s)        throws java.io.IOException, ClassNotFoundException {        s.defaultReadObject();        setState(0); // reset to unlocked state    }}

ReentrantLock 是可重入锁,当state > 0 说明锁被持有

公平锁的实现

static final class FairSync extends Sync {    private static final long serialVersionUID = -3000897897090466540L;    final void lock() {        acquire(1); //标识加锁成功后需要改变的状态    }}

aquire方法

public final void acquire(int arg) {    //尝试加锁 如果加锁失败则 调用 acquireQueued 方法加入队列排队    //加入队列后会立刻park,直到解锁调用unpark醒来判断自己是否被打断    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

tryAcquire 方法

protected final boolean tryAcquire(int acquires) {    final Thread current = Thread.currentThread();//拿到当前线程    int c = getState();//获取lock对象的锁状态  0为自由状态 1为上锁 大于1表示重入    if (c == 0) { // 如果锁是自由状态        //判断当前是否需要排队,如果不需要则进行cas操作尝试加锁        if (!hasQueuedPredecessors() &&            compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);//如果加锁成功,将当前线程设置为拥有锁的线程            return true;        }    }    //锁不是自由状态 但当前线程是持锁线程 表示重入则 状态值 +1    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0)            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }    return false;}

判断是否需要排队

public final boolean hasQueuedPredecessors() {    Node t = tail;     Node h = head;    Node s;    return h != t &&        ((s = h.next) == null || s.thread != Thread.currentThread());}

这里会有不需要排队的两种情况

  • 队列没有初始化不需要排队,直接去加锁但可能会失败假设 t1 t2同时lock,t1 t2均查询到此时队列没有初始化认为不需要排队,同时进行cas修改操作,原子性操作必定有一个会失败,失败的将去排队
  • 队列已初始化,队列中的node大于1,假设tc加锁时发现队列中第一个就是自身,例如重入队列中第一个排队的线程进入队列后会尝试获取锁,如果此时持锁线程释放了锁则直接获取锁执行,如果没有释放那就排队 这里的判断由两部分组成
  • 队首不等于队尾三种情况队列没有初始化 直接上锁队列已经初始化了 假设h!=t成立(队首不等于队尾的情况一定是队列node个数大于1的情况,队列中只有一个node时不算排队 head节点不参与排队,它要么时持锁node,要么时虚拟头节点),队列中线程大于1个 s==null不成立 那么判断 s.thread != Thread.currentThread() 分两种情况,此时head持锁线程正在做事情,不知道有没有做完当前参与竞争锁的线程不是第一个排队线程,返回true需要排队当前参与竞争锁的线程就是第一个排队的线程,返回false不需排队,尝试加锁尝试加锁时,head节点 也就是持锁线程释放锁 则 加锁成功 返回最顶层aquire成功尝试加锁失败,head节点还没有释放锁
  • 队列已初始化但是里面只有一个数据队列初始化时会虚拟一个头节点h,AQS默认 h 是不参与排队的,当队列中的第一个排队node得到锁会将自己设置为h 头节点,将自身线程设置为持锁线程。因此队列已初始化但只有一个节点并且这个节点就是持锁线程 那么h!=t 就不成立直接返回false不需要排队。

acquireQueued(addWaiter(Node.exclusive),arg))方法解析,两种情况

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}
- first节点持有了锁(将自己改成head)未释放,curr节点(此时是first)尝试加锁失败仍需要排队- 其他线程抢占了锁,队列中有节点排队,curr(不是first,它前面还有其他node)跟着排队

入队

private Node addWaiter(Node mode) {    //将当前线程封装成node对象    Node node = new Node(Thread.currentThread(), mode);    //将尾节点赋值给pred    Node pred = tail;    if (pred != null) { //如果队尾不为null 说明队列已初始化        node.prev = pred;//设置当前节点的上个节点是原队列的尾节点        if (compareAndSetTail(pred, node)) {//cas操作防止多线程加锁,确保当前节点入队时的原子操作            pred.next = node;//将当前线程node设置为原队列尾节点的下一个节点            return node;//返回当前线程节点对象        }    }    enq(node);//队列还没初始化    return node;//返回当前线程节点对象}

初始化队列

private Node enq(final Node node) {    //死循环    for (;;) {        Node t = tail;//每次循环队尾节点赋值给t        if (t == null) { //第一次循环时队尾肯定为null            //调用无参构造方法实例化node对象  属性都为null            if (compareAndSetHead(new Node()))                //此时队列中只有一个新的虚拟node 使队列(链表)成立设置头尾相等                tail = head;        } else {//第二次循环t一定不为null            node.prev = t; // 将当前线程节点对象 放到尾节点(第二次循环尾节点就是头节点)之后            if (compareAndSetTail(t, node)) {//将当前线程对象node入队 并设置为队尾                 t.next = node;//维护好链表设置头节 原来队尾下一个节点尾当前线程节点                return t;//返回队尾节点,即终止循环            }        }    }}

acquireQueued方法的源码分析

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {//死循环,走进这个方法node已经完成了入队            //获得当前线程node的上个node有两种情况,1、p为head节点。2、p不为head            final Node p = node.predecessor();            //如果p是head,那么当前线程对象node 为first, 尝试加锁,其实就是想看下head节点释放锁了没            if (p == head && tryAcquire(arg)) {                setHead(node);//原head释放了锁,加锁成功将自身设置为head                p.next = null; //原head的下一节点置空 因为当前node成为head                failed = false;//标识 当前node加锁成功时为false                return interrupted;//返回默认值            }            //这里分两种情况 1、上个节点不是head 2、上个节点是head但未释放锁 修改上个节点状态park            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())//自身park                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

Synchronized 1.6 之前对任何情况的处理相差不多,效率比较低下 ---重量级锁 并发编程之父Doug Lea 写了ReentrantLock

ReentrantLock的四种情况

ReentrantLock的性能比 synchronized 高

  • 从头到尾第一个线程t0获取锁的时候代价基本为0
    ReentrantLock加锁最大的开销在于 compareAndSetState(0,acquires) cas操作指令,cpu底层提供的硬件指令,跟语言无关synchronize 只有一个线程时是偏向锁
  • 多线程之间顺序执行(t0执行完,t1执行)没有竞争执行,代价几乎为0synchronize 此时升级为轻量级锁
  • 如果有线程竞争执行直到第二个线程自旋完 当前线程还没有释放锁第二个线程来加锁时没释放锁,但是入队自旋之后第一个线程释放了锁
  • t0持有锁不释放,t1尝试加锁失败,t2来加锁

公平锁与非公平锁区别

当锁是自由状态是 公平锁需要判断自己是否要排队,而非公平锁直接cas操作,当锁不是自由状态非公平锁和公平锁没有区别,非公平锁的效率比公平锁高
sychronized 只支持公平锁
有的人认为公平锁就是排队非公平锁是插队,这种说法是不完全正确的。 实际上非公平锁入队之后是无法插队的 只能在入队之前的两次抢锁过程种可以算插队。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章