科技行者

行者学院 转型私董会 科技行者专题报道 网红大战科技行者

知识库

知识库 安全导航

至顶网软件频道基础软件Java 5.0多线程编程(4)

Java 5.0多线程编程(4)

  • 扫一扫
    分享文章到微信

  • 扫一扫
    关注官方公众号
    至顶头条

CountDownLatch是个计数器,它有一个初始数,等待这个计数器的线程必须等到计数器倒数到零时才可继续。

作者:中国IT实验室 来源:中国IT实验室 2007年8月28日

关键字: 多线程 java

  • 评论
  • 分享微博
  • 分享邮件

CountDownLatch:

   CountDownLatch是个计数器,它有一个初始数,等待这个计数器的线程必须等到计数器倒数到零时才可继续。比如说一个Server启动时需要初始化4个部件,Server可以同时启动4个线程去初始化这4个部件,然后调用CountDownLatch(4).await()阻断进入等待,每个线程完成任务后会调用一次CountDownLatch.countDown()来倒计数, 4个线程都结束时CountDownLatch的计数就会降低为0,此时Server就会被唤醒继续下一步操作。CountDownLatch的方法主要有:

  • await():使调用此方法的线程阻断进入等待
  • countDown(): 倒计数,将计数值减1
  • getCount(): 得到当前的计数值

   CountDownLatch的例子:一个server调了三个ComponentThread分别去启动三个组件,然后server等到组件都启动了再继续。

public class Server {

      public static void main(String[] args) throws InterruptedException{

            System.out.println("Server is starting.");

            //初始化一个初始值为3CountDownLatch

            CountDownLatch latch = new CountDownLatch(3);

            //3个线程分别去启动3个组件

            ExecutorService service = Executors.newCachedThreadPool();

            service.submit(new ComponentThread(latch, 1));

            service.submit(new ComponentThread(latch, 2));

            service.submit(new ComponentThread(latch, 3));

            service.shutdown();

            //进入等待状态

            latch.await();

            //当所需的三个组件都完成时,Server就可继续了

            System.out.println("Server is up!");

      }

}

 

public class ComponentThread implements Runnable{

      CountDownLatch latch;

      int ID;

      /** Creates a new instance of ComponentThread */

      public ComponentThread(CountDownLatch latch, int ID) {

            this.latch = latch;

            this.ID = ID;

      }

      public void run() {

            System.out.println("Component "+ID + " initialized!");

            //将计数减一

            latch.countDown();

      }    

}

运行结果:

Server is starting.

Component 1 initialized!

Component 3 initialized!

Component 2 initialized!

Server is up!

CyclicBarrier:

   CyclicBarrier类似于CountDownLatch也是个计数器,不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数,当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。CyclicBarrier就象它名字的意思一样,可看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。CyclicBarrier初始时还可带一个Runnable的参数,此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

CyclicBarrier提供以下几个方法:

  • await():进入等待
  • getParties():返回此barrier需要的线程数
  • reset():将此barrier重置

   以下是使用CyclicBarrier的一个例子:两个线程分别在一个数组里放一个数,当这两个线程都结束后,主线程算出数组里的数的和(这个例子比较无聊,我没有想到更合适的例子)

public class MainThread {

public static void main(String[] args)

      throws InterruptedException, BrokenBarrierException, TimeoutException{

            final int[] array = new int[2];

            CyclicBarrier barrier = new CyclicBarrier(2,

                  new Runnable() {//在所有线程都到达Barrier时执行

                  public void run() {

                        System.out.println("Total is:"+(array[0]+array[1]));

                  }

            });           

            //启动线程

            new Thread(new ComponentThread(barrier, array, 0)).start();

            new Thread(new ComponentThread(barrier, array, 1)).start();   

      }     

}

 

public class ComponentThread implements Runnable{

      CyclicBarrier barrier;

      int ID;

      int[] array;

      public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {

            this.barrier = barrier;

            this.ID = ID;

            this.array = array;

      }

      public void run() {

            try {

                  array[ID] = new Random().nextInt();

                  System.out.println(ID+ " generates:"+array[ID]);

                  //该线程完成了任务等在Barrier

                  barrier.await();

            } catch (BrokenBarrierException ex) {

                  ex.printStackTrace();

            } catch (InterruptedException ex) {

                  ex.printStackTrace();

            }

      }

}

Exchanger:

   顾名思义Exchanger让两个线程可以互换信息。用一个例子来解释比较容易。例子中服务生线程往空的杯子里倒水,顾客线程从装满水的杯子里喝水,然后通过Exchanger双方互换杯子,服务生接着往空杯子里倒水,顾客接着喝水,然后交换,如此周而复始。

class FillAndEmpty {

      //初始化一个Exchanger,并规定可交换的信息类型是DataCup

      Exchanger exchanger = new Exchanger();

      Cup initialEmptyCup = ...; //初始化一个空的杯子

      Cup initialFullCup = ...; //初始化一个装满水的杯子

      //服务生线程

      class Waiter implements Runnable {

            public void run() {

                  Cup currentCup = initialEmptyCup;

                  try {

                        //往空的杯子里加水

                        currentCup.addWater();

                        //杯子满后和顾客的空杯子交换

                        currentCup = exchanger.exchange(currentCup);

                  } catch (InterruptedException ex) { ... handle ... }

            }

      }

      //顾客线程

      class Customer implements Runnable {

            public void run() {

                  DataCup currentCup = initialFullCup;

                  try {

                        //把杯子里的水喝掉

                        currentCup.drinkFromCup();

                        //将空杯子和服务生的满杯子交换

                        currentCup = exchanger.exchange(currentCup);

                  } catch (InterruptedException ex) { ... handle ...}

            }

      }

     

      void start() {

            new Thread(new Waiter()).start();

            new Thread(new Customer()).start();

      }

}

6: BlockingQueue接口

   BlockingQueue是一种特殊的Queue,若BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态直到BlocingkQueue进了新货才会被唤醒。同样,如果BlockingQueue是满的任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有新的空间才会被唤醒继续操作。BlockingQueue提供的方法主要有:

  • add(anObject): anObject加到BlockingQueue里,如果BlockingQueue可以容纳返回true,否则抛出IllegalStateException异常。
  • offer(anObject):把anObject加到BlockingQueue里,如果BlockingQueue可以容纳返回true,否则返回false
  • put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,调用此方法的线程被阻断直到BlockingQueue里有新的空间再继续。
  • poll(time):取出BlockingQueue里排在首位的对象,若不能立即取出可等time参数规定的时间。取不到时返回null
  • take():取出BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。

根据不同的需要BlockingQueue4种具体实现:

  • ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
  • LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO(先入先出)顺序排序的。LinkedBlockingQueueArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue
  • PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
  • SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。

下面是用BlockingQueue来实现ProducerConsumer的例子:

public class BlockingQueueTest {

      static BlockingQueue basket;

      public BlockingQueueTest() {

            //定义了一个大小为2BlockingQueue,也可根据需要用其他的具体类

            basket = new ArrayBlockingQueue(2);

      }

      class Producor implements Runnable {

            public void run() {

                  while(true){

                        try {

                              //放入一个对象,若basket满了,等到basket有位置

                              basket.put("An apple");

                        } catch (InterruptedException ex) {

                              ex.printStackTrace();

                        }

                  }

            }

      }

      class Consumer implements Runnable {

            public void run() {

                  while(true){

                        try {

                              //取出一个对象,若basket为空,等到basket有东西为止

                              String result = basket.take();

                        } catch (InterruptedException ex) {

                              ex.printStackTrace();

                        }

                  }

            }           

      }

      public void execute(){

            for(int i=0; i<10; i++){

                  new Thread(new Producor()).start();

                  new Thread(new Consumer()).start();

            }           

      }

      public static void main(String[] args){

            BlockingQueueTest test = new BlockingQueueTest();

            test.execute();

      }     

}

7Atomics 原子级变量

   原子量级的变量,主要的类有AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……。这些原子量级的变量主要提供两个方法:

  • compareAndSet(expectedValue, newValue): 比较当前的值是否等于expectedValue,若等于把当前值改成newValue,并返回true。若不等,返回false
  • getAndSet(newValue): 把当前值改为newValue,并返回改变前的值。

   这些原子级变量利用了现代处理器(CPU)的硬件支持可把两步操作合为一步的功能,避免了不必要的锁定,提高了程序的运行效率。

8Concurrent Collections 共点聚集

   在Java的聚集框架里可以调用Collections.synchronizeCollection(aCollection)将普通聚集改变成同步聚集,使之可用于多线程的环境下。 但同步聚集在一个时刻只允许一个线程访问它,其它想同时访问它的线程会被阻断,导致程序运行效率不高。Java 5.0里提供了几个共点聚集类,它们把以前需要几步才能完成的操作合成一个原子量级的操作,这样就可让多个线程同时对聚集进行操作,避免了锁定,从而提高了程序的运行效率。Java 5.0目前提供的共点聚集类有:ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayListCopyOnWriteArraySet.

查看本文来源
    • 评论
    • 分享微博
    • 分享邮件
    邮件订阅

    如果您非常迫切的想了解IT领域最新产品与技术信息,那么订阅至顶网技术邮件将是您的最佳途径之一。

    重磅专题
    往期文章
    最新文章