ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java并发工具之并发容器

2020-03-13 18:40:03  阅读:119  来源: 互联网

标签:map Java 容器 并发 线程 key new null public


1. 并发容器概览

简介
ConcurrentHashMap 线程安全的HashMap
CopyOnWriteArrayList 线程安全的List
BlockingQueue 接口,表示阻塞队列,适用于作为数据共享的通道
ConcurrentLinkedQueue 线程安全的LinkedList,非阻塞队列

2. Vector和Hashtable为什么过时?

  • Vector和Hashtable也是为了解决线程安全问题而出现的,但是在多线程高并发下的性能不好;
  • Vector实现了List接口,是线程安全的ArrayList;
  • Hashtable实现了Map接口,是线程安全的HashMap;

下面是Vector的get方法,方法上加入了synchronized关键字,每次调用都获取一个悲观锁,是十分影响性能的;

    public synchronized E get(int index) {
        if (index >= elementCount)
            throw new ArrayIndexOutOfBoundsException(index);

        return elementData(index);
    }

下面是Hashtable的get方法,同样的,synchronized 作用在方法上:

    public synchronized V get(Object key) {
        Entry<?,?> tab[] = table;
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                return (V)e.value;
            }
        }
        return null;
    }

3. Collections.synchronizedList()与synchronizedMap()解析

List<Object> list = Collections.synchronizedList(new ArrayList<>());
Map<Object, Object> objectObjectMap = Collections.synchronizedMap(new HashMap<>());

Collections.synchronizedList()与Collections.synchronizedMap()可以分别把ArrayList和HashMap变成线程安全的。
下面是Collections.synchronizedList()的get方法,虽然synchronized 不是加在方法上,但是把整个方法体包了起来,没比Vector好多少。

        public E get(int index) {
            synchronized (mutex) {return list.get(index);}
        }

下面是Collections.synchronizedMap()的get方法,同样的用synchronized代码块保证线程安全。

        public V get(Object key) {
            synchronized (mutex) {return m.get(key);}
        }

4. CopyOnWriteArrayList与ConcurrentHashMap简介

  • CopyOnWriteArrayList与ConcurrentHashMap就是用来取代Collections.synchronizedList()与synchronizedMap()的;
  • 因为在绝大多数情况下,这两个性能都更好,除非List经常被修改,Collections.synchronizedList()性能才会好于CopyOnWriteArrayList,因为CopyOnWriteArrayList适合读多写少的场景,每次写入都会完整复制整个List;
  • ConcurrentHashMap没有例外,ConcurrentHashMap在任何情况下都比synchronizedMap()性能好;

5. Map详解

Map是一个接口,分别有以下四个实现:
在这里插入图片描述
1. HashMap是Map的实现,根据key的hashcode存储,访问速度快,允许key为null,线程不安全;
2. Hashtable也是对Map的实现,线程安全;
3. LinkedHashMap继承于HashMap,对HashMap进行了扩展,保持了插入顺序;
4. TreeMap实现了SortedMap,拥有了排序的功能;

Map的常用方法如下:

public class MapDemo {

    public static void main(String[] args) {
        Map<String, Object> map = new HashMap<>();
        System.out.println("map是否为空:" + map.isEmpty());
        map.put("a", 1);
        map.put("b", 2);
        System.out.println(map.keySet());
        System.out.println("map中key是否包含a:" + map.containsKey("a"));
        System.out.println(map.entrySet());
        System.out.println("获取key为b的value:" + map.get("b"));
        System.out.println("map的大小为:" + map.size());
    }
}
map是否为空:true
[a, b]
map中key是否包含a:true
[a=1, b=2]
获取key为b的value:2
map的大小为:2

6. 为什么需要ConcurrentHashMap?

原因如下:

  1. 因为Collections.synchronizedMap()是在每个方法上加synchronized代码块,影响性能;

  2. HashMap不是线程安全的:

    1. 同时put碰撞导致数据丢失,多个线程同时put,计算出来的哈希值一样的话会放到同一个位置,最终只有一个线程成功,其他的数据就丢失了;
    2. 同时put扩容导致数据丢失,多个线程同时put,同时发现需要扩容,扩容后的数组只有一个会保留下来,也会造成丢失;
    3. 死循环造成的CPU100%:只有在JDK1.7及以前的版本会出现死循环,多个线程同时扩容的时候可能对链表的死循环;
    
public class HashMapCPUDemo {

    private static Map<Integer, String> map = new HashMap<>(2, 1.5f);
    public static void main(String[] args) {
        map.put(1, "A");
        map.put(2, "B");
        map.put(3, "C");

        new Thread(() -> {
            map.put(4, "D");
            System.out.println(map);
        }).start();
        new Thread(() -> {
            map.put(5, "E");
            System.out.println(map);
        }).start();
    }
}

7. JDK1.7版本HashMap的结构

在这里插入图片描述
0 1 2 3表示一个一个的桶数组,相同的哈希值就通过链表一直往下链;

7. JDK1.8版本HashMap的结构

在这里插入图片描述
1.8最大的变化是由原来的拉链法升级成红黑树,红黑树对二叉查找树的一种平衡策略,节点左边的值要比右边的值要小,会自动平衡,防止极端不平衡从而影响查找效率的情况。
红黑树有四个特点:
1. 每个节点要么是红色,要么是黑色,但是根节点永远是黑色的;
2. 红色节点不能连续,即红色节点的孩子和父亲都不能是红色;
3. 从任一节点到其子树中每个叶子节点的路径都包含相同数量的黑色节点;
4. 所有的叶子节点都是黑色的;
在这里插入图片描述

8. JDK1.7版本ConcurrentHashMap的结构

在这里插入图片描述

  • segment可以理解为一个块,默认块的数量是16,在块的里面是类似于HashMap的数据结构,采用数组和链表组成的拉链法;
  • 每个块都上了ReentrantLock锁,块之间互不影响,提高了并发效率;
  • 默认偶16个块,所以最多可以同时支持16个线程并发写,一旦初始化以后,是不可以扩容的;

9. JDK1.8版本ConcurrentHashMap的结构

在这里插入图片描述

  • 不再采用segment,改用node节点数组;
  • 采用了和HashMap类似的数据结构,当每个node下有8个及以下的key值时候,会采用拉链法;
  • 当每个node下有8个以上的key值的时候,改为了红黑树;

10. JDK1.8版本ConcurrentHashMap的源码分析

  • put方法源码如下:
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();//不允许key为null
        int hash = spread(key.hashCode());//算出哈希值
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();//判断每个数组是否初始化了,如果没有则进行初始化
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果初始化后,当前位置是空,那么用CAS操作放进去就可以了
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//如果没有成功,判断容器是不是正在扩容,是则帮助扩容
                tab = helpTransfer(tab, f);
            else {//如果槽点有值,就会synchronized上锁,进行链表操作,添加或替换节点
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//走到这里,说明是红黑树,把值放到红黑树中
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {//走到这里,说明完成添加工作,判断bingCount个数,即链表中节点的个数是不是8,达到8且容器容量小于64就转成红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

putVal流程如下:

1. 判断key value不为空;
2. 计算哈希值;
3. 根据对应位置节点的类型来赋值,或是helpTranfer,或是加长链表,或是给红黑树增加节点;
4. 检查满足阈值就编程红黑树;
5. 返回oldVal;
  • get方法源码如下:
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());//算出哈希值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;//如果槽点的哈希值符合,key符合就返回val,说明找到了。
            }
            else if (eh < 0) //如果得出负数,说明是红黑树节点,通过find方法找到val
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {//如果既不是红黑树,也不是第一个节点,说明是个链表,则遍历一遍找个这个val
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get流程如下:

1.计算哈希值;
2.找到对应的位置,根据不同情况:直接取值、红黑树里取值、遍历链表取值;
3.返回找到的结果;

11. 为什么JDK1.8要改变JDK1.7的结构?

  • 1.7采用segment结构,1.8采用数组加链表(红黑树)的结构,提高了并发性,原来的segment默认只有16个,就是说最多只有16个线程并发,而1.8的结构中每个node节点都是独立的;
  • Hash碰撞,1.7是拉链法,1.8先用拉链法,达到条件改为红黑树;
  • 并发安全原理不同:1.7采用分段锁提供线程安全,1.8通过CAS加上synchronized;
  • 提高查询效率,链表中的节点为8的时候转为了红黑树,为什么呢?如下:
     * first values are:
     *
     * 0:    0.60653066
     * 1:    0.30326533
     * 2:    0.07581633
     * 3:    0.01263606
     * 4:    0.00157952
     * 5:    0.00015795
     * 6:    0.00001316
     * 7:    0.00000094
     * 8:    0.00000006
     * more: less than 1 in ten million

作者做了概率实验,当链表节点的个数达到8的时候,发生哈希冲突的概率小于千万分之一,同时红黑树每个节点占用的空间是链表节点占用的空间的两倍,所以不是一开始就是红黑树,默认是链表,占用空间更少,通过这样查询效率和空间占用的考虑,选择了8;

12. ConcurrentHashMap的组合操作:replace

public class ConcurrentHashMapDemo implements Runnable {

    private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            int score =  map.get("a");
            int newScore = score + 1;
            map.put("a", newScore);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        map.put("a", 0);
        Runnable runnable = new ConcurrentHashMapDemo();
        Thread t1 = new Thread(runnable);
        Thread t2 = new Thread(runnable);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(map.get("a"));
    }

}
1292

我们可以看到,当两个线程同时运行的时候,map中key为a的value进行累加操作的结果不是预期的2000,因为组合操作不是线程安全的,解决线程安全问题可以用synchronized代码块将组合操作包裹起来,但是这种办法性能较差。下面使用ConcurrentHashMap的replace操作保证线程安全:

public class ConcurrentHashMapDemo2 implements Runnable {

    private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            while (true) {
                int score =  map.get("a");
                int newScore = score + 1;
                boolean b = map.replace("a", score, newScore);
                if (b) {
                    break;
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        map.put("a", 0);
        Runnable runnable = new ConcurrentHashMapDemo2();
        Thread t1 = new Thread(runnable);
        Thread t2 = new Thread(runnable);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(map.get("a"));
    }

}

replace操作会返回boolean类型的结果,如果返回false,说明a对应的值不是score,就不进行替换操作,继续进行while循环,直到a的值等于score;如果返回true,说明当map在replace的时候发现a对应的值确实是score并进行替换,然后退出while循环。

13. CopyOnWriteArrayList详解

  • 代替Vectror和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样;
  • Vector和SynchronizedList的锁的粒度太大,要么加在方法上,要么代码块范围包裹整个方法体,并发效率相对比较低,并且迭代时无法编辑;
  • Copy-On-Write并发容器还包括CopyOnWriteArraySet,用来替代ArraySet;
  • 读操作可以尽可能快,而写即使慢一些也没有太大关系;
  • 读多写少:黑名单,每日更新;监听器:迭代操作远多于修改操作;
  • 读写规则:读取完全不加锁,写入也不会阻塞读取操作。只有写写之间需要进行同步等待;
  • CopyOnWrite含义:往容器中添加元素的时候,不是直接添加,而是将容器先拷贝一份副本,在副本去添加删除等操作,再把容器的引用指向新的,这样读操作就是读旧的容器,读操作也就不受影响;
  • 创建新副本、读写分离;
  • 旧的容器不可变,完全可以并发的读操作;

14. CopyOnWriteArrayList的缺点

· 数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的数据,马上能读到,请不要使用CopyOnWrite容器;
内存占用问题:因为CopyOnWrite的写是复制机制,所以在进行写操作的时候,内存会同时拥有两个对象的内存;

15. CopyOnWriteArrayList源码分析

add方法分析:

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();//先上锁
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝一份数组副本
            newElements[len] = e;//把要添加的新元素添加进去
            setArray(newElements);//把引用指向新的数组
            return true;
        } finally {
            lock.unlock();
        }
    }

get方法分析:

    private E get(Object[] a, int index) {
        return (E) a[index];
    }

直接返回对应索引的值,读取的时候没有加锁。

16. 并发队列:阻塞队列

  • 为什么使用队列:

    1. 用队列可以在线程间传递数据:生产者消费者模式;
    2. 考虑锁等线程安全问题的重任从“你”转移到队列上;
  • Queue保存一组数据,底层是LinkedList;

  • BlockingQueue阻塞的Queue,体现了生产者消费者模式;

17. 阻塞队列BlockingQueue

  1. 阻塞队列是具有阻塞功能的队列,首先是一个队列,其次是具有阻塞功能;
  2. 通常,阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用。阻塞队列是线程安全的,所以生产者和消费者都可以使多线程的;
  3. 是否有界:无界队列意味着里面可以容纳非常多;
  4. 阻塞队列和线程池的关系:阻塞队列是线程池的重要组成部分;
  5. take方法:获取并移除队列的头结点,一旦如果执行take的时候,队列里无数据,则阻塞,直到队列里面有数据;
  6. put方法:插入元素。但是如果队列已满,那么就无法继续插入,则阻塞,知道队列里有了空闲空间;

18. ArrayBlockingQueue详解

  • 有界;
  • 指定容量;
  • 公平:指定是否需要保证公平,如果想保证公平的话,那么等待最长时间的线程会被优先处理,不过这会同时带来一定的性能损耗;
  • 使用案例:面试官面试,生产者消费者问题
public class ArrayBlockingQueueDemo {

    public static void main(String[] args) {
        ArrayBlockingQueue queue = new ArrayBlockingQueue<String>(3);
        Interviewer r1 = new Interviewer(queue);
        Consumer r2 = new Consumer(queue);
        new Thread(r1).start();
        new Thread(r2).start();
    }
}

class Interviewer implements Runnable {
    BlockingQueue<String> queue;

    public Interviewer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            String candidate = "候选人" + i;
            try {
                queue.put(candidate);
                System.out.println(candidate + "正在等待...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            queue.put("stop");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {

    BlockingQueue<String> queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000 );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String msg;
        try {
            while (!(msg = queue.take()).equals("stop")) {
                System.out.println(msg + "面试完了");
            }
            System.out.println("所有候选人都面试完了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

put方法分析:

    public void put(E e) throws InterruptedException {
        checkNotNull(e);//判断是不是Null
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//给锁住,并能响应中断
        try {
            while (count == items.length)
                notFull.await();//发现满了,就线程等待
            enqueue(e);//入队
        } finally {
            lock.unlock();
        }
    }

take方法分析:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

19. 并发容器总结

  1. 并发队列分为阻塞队列BlockingQueue和非阻塞队列ConcurrentBlockingQueue;
  2. 非阻塞队列使用CAS实现的,不是阻塞实现的;
  3. 并发容器有三类:Concurrent*、CopyOnWrite*、Blocking*;
  4. Concurrent*的特点是通过CAS实现并发,而CopyOnWrite则通过赋值一份原数据来实现的,Blocking通过AQS实现的;

标签:map,Java,容器,并发,线程,key,new,null,public
来源: https://blog.csdn.net/qq_36986015/article/details/104818833

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有