`
hehaibo
  • 浏览: 409886 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

volatile拓展文章-- JAVA高级多核线程volatile原理与技巧(转)

 
阅读更多

原文出处:http://wk.baidu.com/view/bc890df5f61fb7360b4c654b?ssid=&from=&bd_page_type=1&uid=C243B7807C9E6422D339FC22E369011A&st=1&pu=sl%401%2Cpw%404500%2Csz%40224_220%2Cpd%401%2Cfz%402%2Clp%406%2Ctpl%40color&set=co#page

 

代码示例有改动。


为什么使用 volatile 比同步代价更低? 同步的代价, 主要由其覆盖范围决定, 如果可以降低同步的覆盖范围, 则可以大幅提升程序性能。
volatile 的覆盖范围仅仅变量级别的,因此它的同步代价很低。

 

volatile 原理是什么?

 

volatile 的语义, 其实是告诉处理器, 不要将我放入工作内存, 请直接在主存操作我。(工作内存详见 java 内存模型) 因此, 当多核或多线程在访问该变量时, 都将直接操作主存, 这从本质上, 做到了变量共享。


volatile 的有什么优势?
1, 更大的程序吞吐量
2, 更少的代码实现多线程
3, 程序的伸缩性较好
4, 比较好理解, 无需太高的学习成本

 

volatile 有什么劣势?
1, 容易出问题
2, 比较难设计 volatile 运算存在脏数据问题,volatile 仅仅能保证变量可见性, 无法保证原子性.


volatile 的 race condition 示例:

public class TestRaceCondition {
 private volatile int i = 0;

 public void increase() {
  i++;
 }

 public int getValue() {
  return i;
 }
 /**
  * 测试方法
  * @param args
  */
 public static void main(String[] args) {
  final TestRaceCondition c =new TestRaceCondition();
  for (int i =0;i<100;i++) {
   new Thread("thread"+i){
    public void run(){
     c.increase();
    }
   }.start();
  }
  System.out.println("期望的值:100");
  System.out.println("实际的值:"+c.getValue());
 }
}
输出结果:期望的值:100
实际的值:63
 

 

 

当多线程执行 increase 方法时, 是否能保证它的值会是线性递增的呢?

答案是否定的。原因: 这里的 increase 方法, 执行的操作是 i++, 即 i = i + 1; 针对 i = i + 1,
在多线程中的运算, 本身需要改变 i 的值. 如果, 在 i 已从内存中取到最新值, 但未与 1 进行运算, 此时其他线程已数次将运算结果赋值给 i. 则当前线程结束时, 之前的数次运算结果都将被覆盖。即执行100 次 increase, 可能结果是 < 100. 一般来说, 这种情况需要较高的压力与并发情况下, 才会出现。如示例所输出的结果一样。


如何避免这种情况? 解决以上问题的方法:
第一种方式是 操作时, 加上同步,这种方法, 无疑将大大降低程序性能, 且违背了 volatile 的初衷。

第二种方式是, 使用硬件原语(CAS), 实现非阻塞算法 从 CPU 原语上, 支持变量级别的低开销同步。CPU 原语-比较并交换(CompareAndSet),实现非阻塞算法。
什么是 CAS? cas 是现代 CPU 提供给并发程序使用的原语操作. 不同的 CPU 有不同的使用规范. 在 Intel 处理器中,比较并交换通过指令的 cmpxchg 系列实现。
PowerPC 处理器有一对名为"加载并保留"和"条件存储"的指令,它们实现相同的目地; MIPS 与 PowerPC 处理器相似,除了第一个指令称为"加载链接"。
CAS 操作包含三个操作数 —— 内存位置(V),预期原值(A)和新值(B) 什么是非阻塞算法? 一个线程的失败或挂起不应该影响其他线程的失败或挂起。
这类算法称之为非阻塞(non blocking)算法 对比阻塞算法: 如果有一类并发操作, 其中一个线程优先得到对象监视器的锁, 当其他线程到达同步 边界时, 就会被阻塞。
直到前一个线程释放掉锁后, 才可以继续竞争对象锁.(当然,这里的竞争也可是公平的, 按先来后到的次序)
CAS 原理: 我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更 改该位置,只告诉我这个位置现在的值即可.
CAS 使用示例(jdk 1.5 并发包 AtomicInteger 类分析

public class AtomicInteger extends Number implements java.io.Serializable {
  private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    /**
     * Creates a new AtomicInteger with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    /**
     * Atomically sets to the given value and returns the old value.
     *
     * @param newValue the new value
     * @return the previous value
     */
    public final int getAndSet(int newValue) {
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int expect, int update) {
  return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
  /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
     * and does not provide ordering guarantees, so is only rarely an
     * appropriate alternative to {@code compareAndSet}.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful.
     */
    public final boolean weakCompareAndSet(int expect, int update) {
 return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the previous value
     */
    public final int getAndDecrement() {
        for (;;) {
            int current = get();
            int next = current - 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the previous value
     */
    public final int getAndAdd(int delta) {
        for (;;) {
            int current = get();
            int next = current + delta;
            if (compareAndSet(current, next))
                return current;
        }
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the updated value
     */
    public final int decrementAndGet() {
        for (;;) {
            int current = get();
            int next = current - 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public final int addAndGet(int delta) {
        for (;;) {
            int current = get();
            int next = current + delta;
            if (compareAndSet(current, next))
                return next;
        }
    }
 //......

}


这个方法是, AtomicInteger 类的常用方法, 作用是, 将变量设 置为指定值, 并返回设置前的值. 它利用了 cpu 原语 compareAndSet 来保障值的唯一性.
另, AtomicInteger 类中, 其他的实用方法, 也是基于同样的实现方式. 比如 getAndIncrement, getAndDecrement, getAndAdd 等等.

CAS 语义上存在的"ABA 问题" 什么是 ABA 问题? 假设, 第一次读取 V 地址的 A 值, 然后通过 CAS 来判断 V 地址的值是否仍旧为 A, 如 果是, 就将 B 的值写入 V 地址,覆盖 A 值.
但是, 语义上, 有一个漏洞, 当第一次读取 V 的 A 值, 此时, 内存 V 的值变为 B 值, 然后在未执行 CAS 前, 又变回了 A 值. 此时, CAS 再执行时, 会判断其正确的, 并进行赋值.
这种判断值的方式来断定内存是否被修改过, 针对某些问题, 是不适用的. 为了解决这种问题, jdk 1.5 并发包提供了 AtomicStampedReference(有标记的原子引 用)类,
通过控制变量值的版本来保证 CAS 正确性. 其实, 大部分通过值的变化来 CAS, 已经够用了.

jdk1.5 原子包介绍(基于 volatile) 包的特色:
1, 普通原子数值类型 AtomicInteger, AtomicLong 提供一些原子操作的加减运算.
2, 使用了解决脏数据问题的经典模式-"比对后设定", 即 查看主存中数据是否与预期 提供的值一致,如果一致,才更新.
3, 使用 AtomicReference 可以实现对所有对象的原子引用及赋值.包括 Double 与 Floa t, 但不包括对其的计算.浮点的计算,只能依靠同步关键字或 Lock 接口来实现了.
4, 对数组元素里的对象,符合以上特点的, 也可采用原子操作.包里提供了一些数组原 子操作类 AtomicIntegerArray, AtomicLongArray 等等.
5, 大幅度提升系统吞吐量及性能.

本文详细介绍[JAVA100 例]065,线程同步

/**<p>Title: 线程同步</p> 
 * <p>Description: 通过使用同步锁实现对共享数据的操作</p>
 * <p>Copyright: Copyright (c) 2003</p> * <p>Filename: SyThreadDemo.java</p> 
 * @version 1.0 */

/**
 * *<br>
 * 类说明:主程序 *<br>
 * 功能描述:构造两个线程,并启动它们
 */
public class SyThreadDemo {
 public static void main(String[] args) {
  Trade ft = new Trade();
  addThread tt1 = new addThread(ft, "add");
  decThread tt2 = new decThread(ft, "dec");
  tt1.start();
  tt2.start();
 }
}

/**
 * *<br>
 * 类说明:同步类 *<br>
 * 功能描述:保存共享数据,
 */
class Trade {
 private String transName;
 private double amount;

 /**
  * *<br>
  * 方法说明:更新数据 *<br>
  * 输入参数:String transName 操作名称 *<br>
  * 输入参数:double amount 资金数量 *<br>
  * 返回类型:
  */
 synchronized void update(String transName, double amount) {
  this.transName = transName;
  this.amount = amount;
  System.out.println(this.transName + " " + this.amount);
 }
}

/**
 * *<br>
 * 类说明:添加资金 *<br>
 * 功能描述:单线程,调用 trade.update()方法,修改数据
 */
class addThread extends Thread {
 private Trade ft;

 addThread(Trade ft, String name) {
  super(name);
  this.ft = ft;
 }

 public void run() {
  for (int i = 0; i < 10; i++)
   ft.update("add", 2000.0);
 }
}

/**
 * *<br>
 * 类说明:减少资金 *<br>
 * 功能描述:单线程,调用 trade.update()方法,修改数据
 */
class decThread extends Thread {
 private Trade fd;

 decThread(Trade fd, String name) {
  super(name);
  this.fd = fd;
 }

 /**
  * *<br>
  * 方法说明:线程主体 *<br>
  * 输入参数: *<br>
  * 返回类型:
  */
 public void run() {
  for (int i = 0; i < 10; i++)
   fd.update("dec", -2000.0);
 }
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics