JUC并发编程
线程和进程
java 不可以 开启线程
并发、并行
并发:(多线程操作同一资源) – 充分利用CPU资源
并行:(多线程同时跑 – 线程池 )
Runtime.getRuntime().availableProcessors()
线程状态 Thread.State
public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
wait / sleep 区别
1、来自不同的类
wait => Object
sleep => Thread
2、关于锁的释放
wait 会释放锁,sleep一直抱着锁
3、使用范围
wait : 必须在同步代码块、同步方法中
sleep : 任意地方
4、捕获异常
都需要捕获异常
Lock锁
synchronized
Lock
创建锁:Lock lock = new ReentrantLock();
try 前上锁,finall 解锁
公平锁:公平,可以先来后到
非公平锁:不公平,允许插队【默认】
synchronized 与 Lock 区别
1、Synchronized 内置的ava关键字,Lock是一个java类
2、Synchronized 无法判断获取锁的状态,Lock可以判断是否获取到了锁
3、Synchronized 会自动释放锁,lock必须要手动释放锁!如果不释放锁,死锁
4、Synchronized 线程1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去;
5、Synchronized 可重入锁,不可以中断的,非公平;Lock,可重入锁,可以判断锁,非公平(可以自己设置);
6、Synchronized 适合锁少量的代码同步问题,Lock适合锁大量的同步代码!
生产者和消费者问题
synchronized版
DataC date = new DataC ();new Thread (()->{ for (int i = 0 ; i < 10 ; i++) {date.producer();}},"A" ).start();new Thread (()->{ for (int i = 0 ; i < 10 ; i++) {date.producer();}},"AA" ).start();new Thread (()->{ for (int i = 0 ; i < 10 ; i++) {date.consumer();}},"B" ).start();new Thread (()->{ for (int i = 0 ; i < 10 ; i++) {date.consumer();}},"BB" ).start();class DataC { private int number = 0 ; public synchronized void producer () throws InterruptedException { while (number!=0 ){this .wait();} number++; this .notifyAll(); } public synchronized void consumer () throws InterruptedException { while (number==0 ){ this .wait();} number--; this .notifyAll(); } }
JUC版
Lock lock = new ReentrantLock (); Condition condition = lock.newCondition(); condition.await(); condition.signalAll();
Condition :精准的通知和唤醒线程(线程顺序)
设置多个监视器,每个监视器监视一个资源
signal 唤醒指定监视器
public class ProducerConsumer_JUC { public static void main (String[] args) { DataJ date = new DataJ (); new Thread (()->{for (int i = 0 ; i < 10 ; i++) {date.A();}},"A" ).start(); new Thread (()->{for (int i = 0 ; i < 10 ; i++) {date.B();}},"B" ).start(); new Thread (()->{for (int i = 0 ; i < 10 ; i++) {date.C();}},"C" ).start(); } } class DataJ { private int number = 1 ; Lock lock = new ReentrantLock (); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); public void A () { lock.lock(); try { while (number!=1 ){ condition1.await(); } number=2 ; System.out.println(Thread.currentThread().getName()+"==>A" ); condition2.signal(); } catch (Exception e) { throw new RuntimeException (e); } finally { lock.unlock(); } } public void B () { lock.lock(); try { while (number!=2 ){ condition2.await(); } number=3 ; System.out.println(Thread.currentThread().getName()+"==>BB" ); condition3.signal(); } catch (Exception e) { } finally { lock.unlock(); } } public void C () {} }
锁8解
synchronized 锁的对象是方法的调用者
1、标准情况下,两个线程哪个先?
2、延迟后,哪个先?
synchronized 锁的对象是方法的调用者
public class Suo8 { public static void main (String[] args) { Phone phone = new Phone (); new Thread (()->{phone.sendSms();},"A" ).start(); new Thread (()->{phone.call();},"B" ).start(); } } class Phone { public synchronized void sendSms () {} public synchronized void call () {} }
3、同步方法和不同步方法的调用次序?
不是同步方法的(普通方法),无锁,不受锁影响 ==》先执行
public class Suo8_3 { public static void main (String[] args) { Phone_3 phone = new Phone_3 (); new Thread (()->{phone.sendSms();},"A" ).start(); new Thread (()->{phone.call();},"B" ).start(); } } class Phone_3 { public synchronized void sendSms () {} public void call () {} }
4、两个对象,两个同步方法,先后次序
按业务处理时间来觉得顺序,如A的业务时间比B长,则B先执行
public class Suo8_3 { public static void main (String[] args) { Phone_3 phone1 = new Phone_3 (); Phone_3 phone2 = new Phone_3 (); new Thread (()->{phone1.sendSms();},"A" ).start(); new Thread (()->{phone2.call();},"B" ).start(); } } class Phone_3 { public synchronized void sendSms () { TimeUnit.SECONDS.sleep(2 ); } public synchronized void call () { } }
5、一个对象,Class都是静态static同步方法,执行次序?
类一加载就有了,锁的是class ,线程A :sendSms先拿到锁
public class Suo8_3 { public static void main (String[] args) { Phone_3 phone = new Phone_3 (); new Thread (()->{phone.sendSms();},"A" ).start(); new Thread (()->{phone.call();},"B" ).start(); } } class Phone_3 { public static synchronized void sendSms () { TimeUnit.SECONDS.sleep(2 ); } public static synchronized void call () {} }
6、两个对象,同一个Class类模板,执行次序?
静态同步方法A先,一个同一个Class类模板,锁的是Class,与业务时长无关系,第一个线程A调用静态同步方法先拿到锁,先执行
public class Suo8_3 { public static void main (String[] args) { Phone_3 phone1 = new Phone_3 (); Phone_3 phone2 = new Phone_3 (); new Thread (()->{phone1.sendSms();},"A" ).start(); new Thread (()->{phone2.call();},"B" ).start(); } } class Phone_3 { public static synchronized void sendSms () { TimeUnit.SECONDS.sleep(2 ); } public static synchronized void call () { } }
7、一对象,同一个Class类,一个静态同步方法,一个普通同步方法,执行次序?
锁不同:静态锁的是Class类,普通同步锁的是调用者(不需要等待Class类锁)
public class Suo8_3 { public static void main (String[] args) { Phone_3 phone = new Phone_3 (); new Thread (()->{phone.sendSms();},"A" ).start(); new Thread (()->{phone.call();},"B" ).start(); } } class Phone_3 { public static synchronized void sendSms () { TimeUnit.SECONDS.sleep(2 ); } public synchronized void call () {} }
8、两对象,同一个Class类,一个静态同步方法,一个普通同步方法,执行次序?
同一个Class类模板,普通方法锁的是调用者,先执行普通同步方法
public class Suo8_3 { public static void main (String[] args) { Phone_3 phone1 = new Phone_3 (); Phone_3 phone2 = new Phone_3 (); new Thread (()->{phone1.sendSms();},"A" ).start(); new Thread (()->{phone2.call();},"B" ).start(); } } class Phone_3 { public static synchronized void sendSms () { TimeUnit.SECONDS.sleep(2 ); } public synchronized void call () {} }
总结 :
次序:普通方法 > 同步方法 > 静态同步方法
锁对象:
同步方法:锁的是调用者,①不同对象看业务时长 ; ②同对象看线程拿到锁次序
静态同步方法:锁的是唯一Class类模板; ①全静态同步看线程拿到锁次序 ;②一静一普,普锁调用者先
集合类 安全
ConcurrentModificationException异常
List
List 集合在并发下进行操作会 不安全 ,会报错:ConcurrentModificationException异常
List<String> list = new ArrayList <>(); for (int i = 0 ; i < 10 ; i++) { new Thread (()->{ list.add(UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(list); },String.valueOf(i)).start(); }
CopyOnWriteArrayList List<String> list = new Vector <>(); List<String> list = Collections.synchronizedList(new ArrayList <>()); List<String> list = new CopyOnWriteArrayList <>();
CopyOnWriteArrayList 与 Vector
Vector的存在add方法是synchronized的,效率比较低
CopyOnWriteArrayList 的add方法不是同步方法,使用的是lock锁
Set CopyOnWriteArraySet
Set 集合在并发下进行操作会 不安全 ,会报错:ConcurrentModificationException异常
Set<String> set = Collections.synchronizedSet(new HashSet <>()); Set<String> set = new CopyOnWriteArraySet <>();
Map 加载因子 、初始化容量
重点:Map怎么用,默认等价于什么?
1、工作不用 HashMap
2、默认等价于 new HashMap<>(16,0.75);
ConcurrentHashMap
Map在并发下进行操作会 不安全 ,会报错:ConcurrentModificationException异常
Map<String, String> map = new ConcurrentHashMap <>();
Callable
Callable
接口类似于[Runnable
],因为它们都是为其实例可能由另一个线程执行的类设计的。然而,A Runnable
不返回结果,也不能抛出被检查的异常。
实现Callable接口,可实现带有返回值,泛型类型即为返回值
class MyThread implements Callable <String> { @Override public String call () { return "null" ; } }
1、启动Callable : 使用适配类 FutureTask
2、获取返回值 : futureTask.get();
3、多线程调用的方法只执行一次 : 例如方法中打印一次是因为FutureTask只会被线程run一次,判断不是第一次就return
4、结果可能需要等待,会阻塞
public class CallableTest { public static void main (String[] args) { FutureTask futureTask = new FutureTask (new MyThread ()); new Thread (futureTask,"A" ).start(); String value = (String) futureTask.get(); } } class MyThread implements Callable <String> { @Override public String call () { return "Hello" ; } }
常用辅助类
CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch (9 ); countDownLatch.countDown(); countDownLatch.await();
public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (9 ); for (int i = 0 ; i < 9 ; i++) { new Thread (()->{ countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await(); } }
CyclicBarrier
加法计数器
允许一组线程全部等待彼此 达到共同屏障点的同步辅助
栅栏,等待指定线程数量执行完毕在等待的时候一起通过
CyclicBarrier cyclicBarrier = new CyclicBarrier (7 , () -> {Runable接口,全部执行完毕执行这里的业务});cyclicBarrier.await();
public class CyclicBarrierTest { public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier (7 , () -> { System.out.println("线程达到预定数量" ); }); for (int i = 0 ; i < 6 ; i++) { final int temp=i; new Thread (()->{ cyclicBarrier.await(); },String.valueOf(i)).start(); } } }
Semaphore
自定义资源数量,多线程争取同一资源,设置各线程的拥有和释放资源
一个计数信号量。 在概念上,信号量维持一组许可证。
作用:多个共享资源互斥的使用,并发限流,控制最大的线程数
Semaphore semaphore = new Semaphore (3 ); semaphore.acquire(); semaphore.release();
public class SemaphoreTest { public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 6 ; i++) { new Thread (()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"得到停车位" ); TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName()+"离开停车位" ); } catch (InterruptedException e) { throw new RuntimeException (e); }finally { semaphore.release(); } },String.valueOf(i)).start(); } } }
读写锁
独占锁(写锁)一次只能被一个线程占有
共享锁(读锁)多个线程可以同时占有
ReadWriteLock
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); lock.writeLock().lock(); lock.writeLock().unlock(); lock.readLock().lock(); lock.readLock().unlock();
读-读:可以共存
读-写:不能共存!
写-写:不能共存
public class ReadWriteLockTest { public static void main (String[] args) { MyCache myCache = new MyCache (); for (int i = 0 ; i < 10 ; i++) { final int temp = i; new Thread (()->{ myCache.write(temp+"" ,temp+"" ); myCache.read(temp+"" ); },String.valueOf(i)).start(); } } } class MyCache { private volatile Map<String, String> map = new HashMap <>(); private ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); public void write (String key,String value) { lock.writeLock().lock(); try { map.put(key,value); } catch (Exception e) { throw new RuntimeException (e);} finally { lock.writeLock().unlock(); } } public void read (String key) { lock.readLock().lock(); try { String objects = map.get(key);} catch (Exception e) { throw new RuntimeException (e);} finally { lock.readLock().unlock(); } } }
阻塞队列
ArrayBlockingQueue
使用场景:多线程并发处理,线程池
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue <>(1 );blockingQueue.add("c" ); blockingQueue.offer("c" ); blockingQueue.put("c" ); blockingQueue.offer("c" ,2 , TimeUnit.SECONDS); blockingQueue.remove(); blockingQueue.poll(); blockingQueue.take(); blockingQueue.poll(2 , TimeUnit.SECONDS);
方式
抛出异常
有返回值
阻塞等待
超时等待
添加
add
offer
put
offer
移除(弹出)
remove
poll
take
poll
检测队首元素
element
peek
-
-
总结:
当定义了指定大小的队列
添加
.add(value):超出队列抛出异常
.offer(value):超出队列返回false
.put(value):超出队列,一直阻塞等待
.offer(value,时间,时间单位):超出队列,阻塞等待指定时间
删除
remove:对于空队列使用弹出会抛出异常
poll:对于空队列使用弹出为null
take:对于没有这个元素,一直阻塞
poll:对于没有这个元素,阻塞等待指定时间
同步队列
SynchronousQueue
public class Queue1 { public static void main (String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue <>(); new Thread (()->{ blockingQueue.put("1" ); blockingQueue.put("2" ); },"T1" ).start(); new Thread (()->{ TimeUnit.SECONDS.sleep(1 ); blockingQueue.take(); TimeUnit.SECONDS.sleep(1 ); blockingQueue.take(); },"T2" ).start(); } }
线程池
三大方法 Executors
1、newSingleThreadExecutor – 容量 1
2、newFixedThreadPool(sum) – 自定义容量
3、newCachedThreadPool – 动态容量
Executors.newSingleThreadExecutor(); ExecutorService threadPool = Executors.newFixedThreadPool(5 ); Executors.newCachedThreadPool();
public static void main (String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(50 ); try { threadPool = Executors.newCachedThreadPool(); for (int i = 0 ; i < 30 ; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { throw new RuntimeException (e); } finally { threadPool.shutdown(); } }
7大参数
三大方法的源码
public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); } public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
ThreadPoolExecutor
public ThreadPoolExecutor (int corePoolSize, //核心线程池大小 int maximumPoolSize, //最大核心线程池大小 long keepAliveTime, //空闲线程存活时间 TimeUnit unit, //存活时间单位 BlockingQueue<Runnable> workQueue, //阻塞队列 ThreadFactory threadFactory, //线程工厂,创建线程 RejectedExecutionHandler handler //拒绝策略 ) {}
创建的线程池大小默认是核心线程池的大小,当阻塞队列满了,每当队列溢出一个则线程池大小+1,直达到最大核心线程池的大小,在达到核心线程池大小和阻塞队列大小的时候触发拒绝策略
public static void main (String[] args) { ExecutorService threadPoolExecutor = new ThreadPoolExecutor ( 2 , 5 , 3 , TimeUnit.SECONDS, new ArrayBlockingQueue <>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy() ); try { for (int i = 0 ; i < 10 ; i++) { threadPoolExecutor.execute(()->{ }); } } catch (Exception e) { } finally { threadPoolExecutor.shutdown(); } }
四大拒绝策略
new ThreadPoolExecutor .AbortPolicy() new ThreadPoolExecutor .CallerRunsPolicy() new ThreadPoolExecutor .DiscardPolicy() new ThreadPoolExecutor .DiscardOldestPolicy()
小结
四大函数式接口
函数式接口:只有一个方法的接口(接口中的方法只能是抽象 或静态 的)
实际表达式里面就是接口里面的方法,把名字省略了,但接口实实在在定义了,所以实现的时候不用名字,但调用的时候是调用名字 的
@FunctionalInterface public interface Runnable { public abstract void run () ; }
函数型接口
public interface Function <T, R> { R apply (T t) ; }
Function function = new Function <String, String>() { @Override public String apply (String s) { return s; } }; function.apply("hello" ) Function<String, String> stringStringFunction =(s) -> {return s;}; Function<String, String> stringStringFunction = s-> s ; stringStringFunction.apply("world" );
断定型接口
public interface Predicate <T> { boolean test (T t) ; }
Predicate<String> predicate = new Predicate <String>() { @Override public boolean test (String s) { return s.isEmpty(); } }; System.out.println(predicate.test("" )); Predicate<String> predicatec = (String s) ->{return s.isEmpty();}; Predicate<String> predicatec = s -> s.isEmpty(); System.out.println(predicatec.test("c" ));
消费型接口
public interface Consumer <T> { void accept (T t) ; }
Consumer<String> consumer = new Consumer <String>() { @Override public void accept (String s) { System.out.println(s); } }; consumer.accept("consumer" ); Consumer<String> stringConsumer = (String s) -> {System.out.println(s);}; Consumer<String> stringConsumer = s -> { System.out.println(s);}; stringConsumer.accept("world" );
供给型接口
@FunctionalInterface public interface Supplier <T> { T get () ; }
Supplier supplier= new Supplier <String>() { @Override public String get () { return "hello" ; } }; System.out.println(supplier.get()); Supplier supplierc = ()->{return "world" ;};System.out.println(supplierc.get());
Stream流 程序存储与计算,计算交给 流 来操作
获取Stream流
ArrayList<String> list = new ArrayList <>(); Collections.addAll(list,"a" ,"b" ,"c" ,"d" ,"e" ); list.stream().forEach(s -> System.out.println(s)); HashMap<String, Integer> map = new HashMap <>(); map.put("aa" ,11 ); map.put("bb" ,22 ); map.keySet().stream().forEach(s -> System.out.println(s)); map.entrySet().stream().forEach(stringIntegerEntry -> System.out.println(stringIntegerEntry)); int arr[] = {1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 };Arrays.stream(arr).forEach(s -> System.out.println(s)); Stream.of("a" ,"b" ,"c" ,"d" ,"e" ).forEach(s -> System.out.println(s));
public static void main (String[] args) { User u1 = new User (1 ,"a" ,10 ); User u2 = new User (2 ,"b" ,11 ); User u3 = new User (3 ,"c" ,12 ); User u4 = new User (4 ,"d" ,13 ); User u5 = new User (5 ,"e" ,14 ); User u6 = new User (6 ,"f" ,15 ); User u7 = new User (7 ,"g" ,16 ); List<User> list = Arrays.asList(u1, u2, u3, u4, u5, u6, u7); list.stream() .filter((User user) -> {return user.getId()%2 ==0 ;}) .filter(user -> {return user.getAge() > 11 ;}) .map(user -> {return user.getName().toUpperCase();}) .sorted((U1,U2)->{return U2.compareTo(U1);}) .limit(1 ) .forEach(System.out::println); }
中间方法
将流合成到对应的数据 .collect(Collectors.toList());
filter
ArrayList<String> list = new ArrayList <>(); Collections.addAll(list,"aa" ,"bb" ,"cc" ,"cd" ,"ce" ); list.stream().filter(new Predicate <String>() { @Override public boolean test (String s) { return s.startsWith("a" ) ; } }).forEach(s -> System.out.println(s)); list.stream().filter(s -> {return s.startsWith("a" ) ;}) .forEach(s -> System.out.println(s));
limit list.stream().limit(3 ).forEach(s -> { System.out.println(s); });
skip list.stream().skip(3 ).forEach(s -> System.out.println(s));
concat
Stream.concat(stringStream,stream)
map ArrayList<String> list = new ArrayList <>(); Collections.addAll(list,"aa-1" ,"bb-2" ,"cc-3" ,"cd-4" ,"ce-5" ); list.stream().map(new Function <String, Integer>() { @Override public Integer apply (String s) { String[] split = s.split("-" ); int age = Integer.parseInt(split[1 ]); return age; } }).forEach(System.out::println); list.stream().map( s -> Integer.parseInt(s.split("-" )[1 ])).forEach(System.out::println); Stream.concat(stringStream,stream).map(new Function <String, Actor>() { @Override public Actor apply (String s) { String name = s.split("," )[0 ]; int age = Integer.parseInt(s.split("," )[1 ]); return new Actor (name,age); } }).forEach(System.out::println); Stream.concat(stringStream,stream).map(s -> new Actor (s.split("," )[0 ],Integer.parseInt(s.split("," )[1 ]))).forEach(System.out::println);
总结方法
Map<String, String> collect = list.stream() .collect(Collectors.toMap( s -> s.split("-" )[0 ], s -> s.split("-" )[1 ] )); System.out.println(collect);
String[] split = stringBuilder.toString().split("-" ); Integer[] integers = Arrays.stream(split).map(Integer::parseInt).sorted().toArray(Integer[]::new );
并行流计算 parallel()
LongStream
是一个用于处理长整数的流。
.parallel()
:这个方法调用将 LongStream
转换为一个并行流(parallel stream)。并行流允许同时在多个处理器核心上进行操作,以提高处理速度。
.reduce(0, Long::sum)
:这一部分使用 reduce
操作来计算流中所有元素的和。reduce
接受两个参数:初始值和一个用于累积操作的二元运算符。
public static void test3 () throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); long sum = LongStream.range(1L , 10_0000_0000L ).parallel().reduce(0 , Long::sum); long end = System.currentTimeMillis(); System.out.println("sum = " +sum+" 执行时间 = " +(end-start)); }
ForkJoin
ForkJoin特点:工作窃取
双端队列
例如:B已经执行完,而A没有执行完,B会窃取A的任务来执行
1、forkjoinPool通过它来执行l
2、计算任务forkjoinPool.execute(ForkJoinTask task)
1、选择递归任务RecursiveTask :继承RecursiveTask ,实现compute 方法
public class ForkjoinTest extends RecursiveTask <Long> { private Long start; private Long end; private Long temp = 10000L ; public ForkjoinTest (Long start, Long end) { this .start = start; this .end = end; } @Override protected Long compute () { Long sum = 0L ; if ((end-start)<temp){ for (Long i = start; i <end; i++) { sum+=i; } return sum; }else { long middle = (start + end)/2 ; ForkjoinTest task1 = new ForkjoinTest (start, middle); task1.fork(); ForkjoinTest task2 = new ForkjoinTest (middle + 1 , end); task2.fork(); return task1.join() + task2.join(); } } }
2、先定义ForkJoinPool ,使用submsit 提交任务,再通过get 获得运行结果
public static void test2 () throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool (); ForkJoinTask<Long> task = new ForkjoinTest (1L ,10_0000_0000L ); ForkJoinTask<Long> submsit = forkJoinPool.submit(task); Long sum = submit.get(); long end = System.currentTimeMillis();
异步回调
异步执行
成功回调
失败回调
public class FutureTest { public static void main (String[] args) throws ExecutionException, InterruptedException, TimeoutException { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{ try { TimeUnit.SECONDS.sleep(10 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("异步调用" ); }); completableFuture.get(); CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()+"supplyAsync=>completableFuture" ); int s = 1 /0 ; return 1024 ; }); System.out.println("流程线程1" ); completableFuture.whenComplete((t,u)->{ System.out.println("t=>" +t); System.out.println("u=>" +u); }).exceptionally((throwable -> { throwable.printStackTrace(); return 404 ; })).get(); System.out.println("流程线程2" ); } }
JMM
JMM:Java内存模型 ,不存在的东西,概念!约定!
关于JMM的一些同步的约定:
1、线程解锁前,必须把共享变量立刻刷回主存。
2、线程加锁前,必须读取主存中的最新值到工作内存中!
3、加锁和解锁是同一把锁
线程、工作内存、主内存
8种操作: (读、加载)(使用、返回)(存储、写)(加锁、解锁)
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
不允许一个线程将没有assign的数据从工作内存同步回主内存
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
对一个变量进行unlock操作之前,必须把此变量同步回主内存
Volatile+原子引用 Volatile是Java虚拟机提供轻量级的同步机制
private static int num = 0 ; public static void main (String[] args) { new Thread (()->{ while (num == 0 ){ } }).start(); TimeUnit.SECONDS.sleep(1 ); num = 1 ; }
1、保证可见性
例如:线程1先读取主内存的值,线程2后读,但线程2修改资源并写入了主内存,线程1不知道,如果以这个值为运行条件则一直运行下去
private volatile static int num = 0 ;
2、不保证原子性
private volatile static int num = 0 ; public static void add () { num++; } public static void main (String[] args) throws InterruptedException { for (int i = 0 ; i < 20 ; i++) { new Thread (()->{ for (int i1 = 0 ; i1 < 1000 ; i1++) { add();} }).start(); } while (Thread.activeCount()>2 ){ Thread.yield (); } }
如果不加Iock和synchronized,怎么样保证原子性?
使用原子类中的包装类,解决原子性问题(synchronized、Iock外的方案)
private static AtomicInteger num = new AtomicInteger ();public static void add () { num.getAndIncrement(); }
3、禁止指令重排
使用 volatile ,由于内存屏障,可以保证避免指令重排的现象产生
单例模式
使用单例模式来创建实例,使用 **getInstance**
方法来获取实例,这样可以确保只有一个实例被创建
懒汉式 :
创建时机 :懒汉式单例在第一次使用时才会被创建,也就是说,它是延迟初始化的。即实例在需要的时候才被创建。
线程安全性 :最简单的懒汉式实现可能不是线程安全的,多个线程可以同时检测到实例为 null,然后尝试创建多个实例。为了确保线程安全,需要引入同步机制(如双重检查锁定)来防止多次实例化。
懒加载 :懒汉式允许懒加载,即只有在需要的时候才会创建实例,这可以节省资源,特别是在实例较大或初始化耗时较长的情况下。
饿汉式 :
创建时机 :饿汉式单例在类加载时就会被创建,无论是否实际使用。即实例在类加载时就被创建好。
线程安全性 :饿汉式单例天生线程安全,因为实例在类加载时就已经创建,不需要担心多线程并发创建多个实例的问题。
不懒加载 :饿汉式不支持懒加载,因为实例在类加载时就已经存在。这可能会浪费一些内存,特别是在实例很大或初始化很复杂的情况下。
饿汉式
一上来就全部加载资源,无论是否使用,实例在类加载时就创建了,会造成资源浪费
饿汉式单例的优点是它在多线程环境下是线程安全的,因为实例在类加载时就创建了,不需要考虑线程同步问题。
public class Hungry { private Hungry () {} private final static Hungry HUNGRY = new Hungry (); public static Hungry getInstance () { return HUNGRY; } }
懒汉式
1–1:可破坏性的单例模式 ①:多个线程可以同时检测到实例为 null,然后尝试创建多个实例
public class LazyMan { private LazyMan () { } private volatile static LazyMan lazyMan; public static LazyMan getInstance () { if (lazyMan==null ){ lazyMan=new LazyMan (); } return lazyMan; } public static void main (String[] args) { for (int i = 0 ; i < 10 ; i++) { new Thread (()->{ LazyMan.getInstance(); }).start(); } } }
1-2:解决多线程破坏单例 :引入同步机制(如双重检查锁定 )来防止多次实例化
public static LazyMan getInstance () { if (lazyMan==null ){ synchronized (LazyMan.class){ if (lazyMan==null ){ lazyMan=new LazyMan (); } } } return lazyMan; }
2–1:可破坏性的单例模式 ②:尽管实现了引入同步机制(如双重检查锁定 )来防止多次实例化,还是可以通过反射来破坏,创建多个实例破坏单例模式
LazyMan lazyMan1 = new LazyMan ();Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null ); declaredConstructor.setAccessible(true ); LazyMan lazyMan2 = declaredConstructor.newInstance();
2-2:解决额外反射创建实例破坏单例 :局限于先使用单例模式创建实例再使用反射创建实例,如果都是通过反射创建实例,则又被破坏
private LazyMan () { synchronized (LazyMan.class){ if (lazyMan!=null ){ throw new RuntimeException ("禁止使用反射破坏单例" ); } } }
2-3:解决全用反射创建实例破坏单例 :定义变量标志位控制判断(红绿灯)
private static boolean chen = false ; private LazyMan () { synchronized (LazyMan.class){ if (chen == false ){ chen = true ; }else { throw new RuntimeException ("禁止使用反射破坏单例" ); } } }
2-4:反射破坏隐藏变量再进行破坏单例 :破解标志位
Field chen1 = LazyMan.class.getDeclaredField("chen" ); chen1.setAccessible(true ); Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null ); declaredConstructor.setAccessible(true ); LazyMan lazyMan1 = declaredConstructor.newInstance(); chen1.set(lazyMan1,false ); LazyMan lazyMan2 = declaredConstructor.newInstance();
最佳单例:枚举 在枚举实现的单例模式中,单例实例通常是一个枚举的常量,例如 INSTANCE
,而不需要显式调用 getInstance
方法来获取实例。这是因为枚举的特性保证了在类加载时,常量 会被初始化,从而实现了单例。
public enum EnumTest { INSTANCE; public EnumTest getInstance () { return INSTANCE; } public static void main (String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { EnumTest instance = EnumTest.INSTANCE; Constructor<EnumTest> declaredConstructor = EnumTest.class.getDeclaredConstructor(String.class,int .class); declaredConstructor.setAccessible(true ); EnumTest enumTest = declaredConstructor.newInstance(); } }
CAS
CAS :比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环!
类似乐观锁
Unsafe :调用底层
AtomicInteger atomicInteger = new AtomicInteger (2023 );atomicInteger.compareAndSet(2023 , 999 ); System.out.println(atomicInteger.get());
缺点:
1、循环会耗时
2、一次性只能保证一个共享变量的原子性
3、ABA问题
ABA问题
aba不解决资源会被提前挪用,这不是我们希望的
A期望是1
在A进入未修改时,速度快,B也期望是1,改为了3,又期望是3,改为了1
A以为还是1,没有更改过,然后更改为2
虽然A的期望值最后没有变化,但中间的时候已经发生了变化
AtomicInteger atomicInteger = new AtomicInteger (2023 );atomicInteger.compareAndSet(2023 , 2020 ); atomicInteger.compareAndSet(2020 , 2023 ); atomicInteger.compareAndSet(2023 , 999 );
原子引用解决
解决 :和乐观锁原理相同,判断版本号
使用原子引用解决
public class CAS_Demo { public static void main (String[] args) { AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference <>(1 ,1 ); new Thread (()->{ int integer = atomicStampedReference.getStamp(); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } atomicStampedReference.compareAndSet(1 , 2 , atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 ); atomicStampedReference.compareAndSet(2 , 1 , atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 ); },"B" ).start(); new Thread (()->{ int integer = atomicStampedReference.getStamp(); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } atomicStampedReference.compareAndSet(1 , 9 , integer, integer + 1 ); },"A" ).start(); } }
可重入锁
synchronized :获得sms的锁,但sms也调用call了,对于也获得了call的锁—–可重入锁
public class CRS_1 { public static void main (String[] args) { Phone phone = new Phone (); new Thread (()->{ phone.sms(); },"A" ).start(); new Thread (()->{ phone.sms(); },"B" ).start(); } } class Phone { public synchronized void sms () { call(); } public synchronized void call () { } }
ReentrantLock :
上锁获得两把钥匙:解对应的lock.unlock();
锁必须配对,一个锁lock.lock();即要解锁一次lock.unlock();
public class CRS_1 { public static void main (String[] args) { Phone phone = new Phone (); new Thread (()->{ phone.sms(); },"A" ).start(); new Thread (()->{ phone.sms(); },"B" ).start(); } } class Phone { Lock lock = new ReentrantLock (); public void sms () { lock.lock(); try { System.out.println(Thread.currentThread().getName()+"=>" +" sms" ); call(); } catch (Exception e) { throw new RuntimeException (e); } finally { lock.unlock(); } } public void call () { lock.lock(); try { System.out.println(Thread.currentThread().getName()+"=>" +" call" ); } catch (Exception e) { throw new RuntimeException (e); } finally { lock.unlock(); } } }
自旋锁
public class SpinlockDemo { AtomicReference<Thread> atomicReference = new AtomicReference <>(); public void myLock () { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "=====>mylock" ); while (!atomicReference.compareAndSet(null ,thread)){ } } public void myUnLock () { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"=====>myUnlock" ); atomicReference.compareAndSet(thread,null ); } }
public class SpinLockTest { public static void main (String[] args) throws InterruptedException { SpinlockDemo lock = new SpinlockDemo (); new Thread (()->{ lock.myLock(); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { lock.myUnLock(); } },"T1" ).start(); TimeUnit.SECONDS.sleep(1 ); new Thread (()->{ lock.myLock(); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { lock.myUnLock(); } },"T2" ).start(); } }
死锁
public class DeadLockDemo { public static void main (String[] args) { String lockA= "lockA" ; String lockB= "lockB" ; new Thread (new MyThread (lockA,lockB),"T1" ).start(); new Thread (new MyThread (lockB,lockA),"T2" ).start(); } } class MyThread implements Runnable { private String lockA; private String lockB; public MyThread (String lockA, String lockB) { this .lockA = lockA; this .lockB = lockB; } @Override public void run () { synchronized (lockA){ System.out.println(Thread.currentThread().getName()+" lock " +lockA+"=>get " +lockB); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } synchronized (lockB){ System.out.println(Thread.currentThread().getName()+" lock " +lockB+"=>get" +lockA); } } } }
解决问题 JPS
1、查看进程号 :jps -l
2、查看进程信息:jstack 进程号
实践 秒杀
超卖问题
进入购买,库存 - 1
【判断条件:使用乐观锁,使用版本号思想,在更新库存的时候判断查询时候的版本号是否一致,防止查询后修改前有事务进入修改,如一致 == 库存 -1,生成订单号】
【CAS 】:类似判断版本号:既然需要更新前判断和查询的的版本是否一致,直接判断库存是否一致即可:但会导致不一致的事务不成功,则很多事务不成功,卖出少,解决方法:判断库存 > 0】
存在一人多单情况
一人一单
每次调用userId.toString都是一个全新的对象,但使用intern后:直接去字符串常量池寻找值一样的地址,如果每次的值都一样,则返回的地址一样
Long userId = ....getId()synchronized (userId.toString().intern()){ }
悲观锁:要先获取锁,提交事务,再释放锁 才能线程安全