JUC并发编程

  • java.util.concurrent


线程和进程

  • java默认有 2 个线程 :main、GC

java 不可以 开启线程

  • 操作本地方法,底层的C++,Java无法直接操l作硬件,在虚拟机上运行



并发、并行

并发:(多线程操作同一资源) – 充分利用CPU资源

并行:(多线程同时跑 – 线程池 )

//核心数
Runtime.getRuntime().availableProcessors()


线程状态 Thread.State

public enum State {

//新生
NEW,

//运行
RUNNABLE,

//阻塞
BLOCKED,

//等待
WAITING,

//超时等待
TIMED_WAITING,

//终止
TERMINATED;
}


wait / sleep 区别

1、来自不同的类

wait => Object

sleep => Thread


2、关于锁的释放

wait 会释放锁,sleep一直抱着锁


3、使用范围

wait : 必须在同步代码块、同步方法中

sleep : 任意地方


4、捕获异常

都需要捕获异常







Lock锁

synchronized

  • 类上 / 方法

image-20230925163107505



Lock

  • 创建锁:Lock lock = new ReentrantLock();
  • try 前上锁,finall 解锁

image-20230925163309271

公平锁:公平,可以先来后到

非公平锁:不公平,允许插队【默认】



synchronizedLock 区别

1、Synchronized 内置的ava关键字,Lock是一个java类

2、Synchronized 无法判断获取锁的状态,Lock可以判断是否获取到了锁

3、Synchronized 会自动释放锁,lock必须要手动释放锁!如果不释放锁,死锁

4、Synchronized 线程1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去;

5、Synchronized 可重入锁,不可以中断的,非公平;Lock,可重入锁,可以判断锁,非公平(可以自己设置);

6、Synchronized 适合锁少量的代码同步问题,Lock适合锁大量的同步代码!






生产者和消费者问题


synchronized版

image-20230925163100352

DataC date = new DataC();
new Thread(()->{ for (int i = 0; i < 10; i++) {date.producer();}},"A").start();
new Thread(()->{ for (int i = 0; i < 10; i++) {date.producer();}},"AA").start();
new Thread(()->{ for (int i = 0; i < 10; i++) {date.consumer();}},"B").start();
new Thread(()->{ for (int i = 0; i < 10; i++) {date.consumer();}},"BB").start();


class DataC{
private int number = 0;

public synchronized void producer() throws InterruptedException {
//如果使用if,只判断一次,出现虚假唤醒
while (number!=0){this.wait();}
number++;
this.notifyAll();
}

public synchronized void consumer() throws InterruptedException {
while (number==0){ this.wait();}
number--;
this.notifyAll();
}
}



/**
*线程随机运行
**/


JUC版

  • Condition监视器

   Lock lock = new ReentrantLock();
Condition condition = lock.newCondition(); //定义一个监视器
condition.await(); //等待
condition.signalAll(); //唤醒全部

Condition :精准的通知和唤醒线程(线程顺序)

  • 设置多个监视器,每个监视器监视一个资源
  • signal 唤醒指定监视器
public class ProducerConsumer_JUC {
public static void main(String[] args) {
DataJ date = new DataJ();
new Thread(()->{for (int i = 0; i < 10; i++) {date.A();}},"A").start();
new Thread(()->{for (int i = 0; i < 10; i++) {date.B();}},"B").start();
new Thread(()->{for (int i = 0; i < 10; i++) {date.C();}},"C").start();
}
}

class DataJ{
private int number = 1; //1A,2B,3C
Lock lock = new ReentrantLock();
//开启多个监视器 -- 各自监视对应的资源
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
public void A(){
lock.lock();
try {
while (number!=1){
condition1.await(); //1资源等待
}
number=2;
System.out.println(Thread.currentThread().getName()+"==>A");
condition2.signal(); //唤醒2资源

} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}

public void B(){
lock.lock();
try {
while (number!=2){
condition2.await(); //2资源等待
}
number=3;
System.out.println(Thread.currentThread().getName()+"==>BB");
condition3.signal(); //唤醒3资源
} catch (Exception e) {
} finally {
lock.unlock();
}
}

public void C(){}
}





锁8解

synchronized 锁的对象是方法的调用者


1、标准情况下,两个线程哪个先?

2、延迟后,哪个先?

synchronized 锁的对象是方法的调用者

  • 在一个对象中,锁是加在对象上的!同一时刻对于每一个类实例,其所有声明为 synchronized 的成员函数中至多只有一个处于可执行状态

  • 例如调用对象同一个的都是 phone,用同一把锁,谁先拿到谁先执行,例如线程A先拿到锁

//  A  ==>  B
public class Suo8 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{phone.sendSms();},"A").start();
new Thread(()->{phone.call();},"B").start();
}
}

class Phone{
public synchronized void sendSms() {}
public synchronized void call(){}
}


3、同步方法和不同步方法的调用次序?

不是同步方法的(普通方法),无锁,不受锁影响 ==》先执行

//B  ==>  A
public class Suo8_3 {
public static void main(String[] args) {
Phone_3 phone = new Phone_3();
new Thread(()->{phone.sendSms();},"A").start();
new Thread(()->{phone.call();},"B").start();
}
}

class Phone_3{
public synchronized void sendSms() {}
public void call(){}
}


4、两个对象,两个同步方法,先后次序

按业务处理时间来觉得顺序,如A的业务时间比B长,则B先执行

public class Suo8_3 {
public static void main(String[] args) {
//两个对象,两个调用者,两把锁
Phone_3 phone1 = new Phone_3();
Phone_3 phone2 = new Phone_3();

new Thread(()->{phone1.sendSms();},"A").start();
new Thread(()->{phone2.call();},"B").start();
}
}

class Phone_3{
public synchronized void sendSms() {
TimeUnit.SECONDS.sleep(2);//模拟业务
}

public synchronized void call(){ }
}


5、一个对象,Class都是静态static同步方法,执行次序?

类一加载就有了,锁的是class,线程A :sendSms先拿到锁

// A  ==>  B
public class Suo8_3 {
public static void main(String[] args) {
Phone_3 phone = new Phone_3();
new Thread(()->{phone.sendSms();},"A").start();
new Thread(()->{phone.call();},"B").start();
}
}

class Phone_3{
public static synchronized void sendSms() {
TimeUnit.SECONDS.sleep(2);
}
public static synchronized void call(){}
}



6、两个对象,同一个Class类模板,执行次序?

静态同步方法A先,一个同一个Class类模板,锁的是Class,与业务时长无关系,第一个线程A调用静态同步方法先拿到锁,先执行

public class Suo8_3 {
public static void main(String[] args) {
Phone_3 phone1 = new Phone_3();
Phone_3 phone2 = new Phone_3();

new Thread(()->{phone1.sendSms();},"A").start();
new Thread(()->{phone2.call();},"B").start();
}
}

class Phone_3{
public static synchronized void sendSms() {
TimeUnit.SECONDS.sleep(2);
}
public static synchronized void call(){ }
}


7、一对象,同一个Class类,一个静态同步方法,一个普通同步方法,执行次序?

锁不同:静态锁的是Class类,普通同步锁的是调用者(不需要等待Class类锁)

//  B  ==>  A
public class Suo8_3 {
public static void main(String[] args) {
Phone_3 phone = new Phone_3();

new Thread(()->{phone.sendSms();},"A").start();
new Thread(()->{phone.call();},"B").start();
}
}

class Phone_3{
public static synchronized void sendSms() {
TimeUnit.SECONDS.sleep(2);
}
public synchronized void call(){}
}


8、两对象,同一个Class类,一个静态同步方法,一个普通同步方法,执行次序?

同一个Class类模板,普通方法锁的是调用者,先执行普通同步方法

public class Suo8_3 {
public static void main(String[] args) {
Phone_3 phone1 = new Phone_3();
Phone_3 phone2 = new Phone_3();

new Thread(()->{phone1.sendSms();},"A").start();
new Thread(()->{phone2.call();},"B").start();
}
}

class Phone_3{
public static synchronized void sendSms() {
TimeUnit.SECONDS.sleep(2);
}

public synchronized void call(){}
}


总结

次序:普通方法 > 同步方法 > 静态同步方法

锁对象:

  • 同步方法:锁的是调用者,①不同对象看业务时长 ; ②同对象看线程拿到锁次序
  • 静态同步方法:锁的是唯一Class类模板; ①全静态同步看线程拿到锁次序 ;②一静一普,普锁调用者先





集合类 安全

  • ConcurrentModificationException异常

List

  • List 集合在并发下进行操作会 不安全 ,会报错:ConcurrentModificationException异常
//异常代码复现
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}


CopyOnWriteArrayList

//1.  
List<String> list = new Vector<>();

//2.
List<String> list = Collections.synchronizedList(new ArrayList<>());

//3. CopyOnWrite:写入时复制。写入的时候复制一份,写入完再插入,避免覆盖
List<String> list = new CopyOnWriteArrayList<>();

CopyOnWriteArrayList 与 Vector

  • Vector的存在add方法是synchronized的,效率比较低
  • CopyOnWriteArrayList 的add方法不是同步方法,使用的是lock锁





Set

CopyOnWriteArraySet

  • Set 集合在并发下进行操作会 不安全 ,会报错:ConcurrentModificationException异常
Set<String> set = Collections.synchronizedSet(new HashSet<>());

Set<String> set = new CopyOnWriteArraySet<>();





Map

加载因子初始化容量

重点:Map怎么用,默认等价于什么?

1、工作不用 HashMap

2、默认等价于 new HashMap<>(16,0.75);



ConcurrentHashMap

  • Map在并发下进行操作会 不安全 ,会报错:ConcurrentModificationException异常
Map<String, String> map = new ConcurrentHashMap<>();





Callable

  • Callable接口类似于[Runnable],因为它们都是为其实例可能由另一个线程执行的类设计的。然而,A Runnable不返回结果,也不能抛出被检查的异常。

实现Callable接口,可实现带有返回值,泛型类型即为返回值

//和Runnable差不多,不过要重写call,且根据泛型类型参数返回对应的类型
class MyThread implements Callable<String> {

@Override
public String call(){
return "null";
}
}



image-20230926145506977

1、启动Callable使用适配类 FutureTask

2、获取返回值futureTask.get();

3、多线程调用的方法只执行一次 : 例如方法中打印一次是因为FutureTask只会被线程run一次,判断不是第一次就return

4、结果可能需要等待,会阻塞

public class CallableTest {
public static void main(String[] args){

FutureTask futureTask = new FutureTask(new MyThread());
new Thread(futureTask,"A").start();
String value = (String) futureTask.get();//get可能会产生阻塞(看业务返回时间,把它放最后一行)
}
}

class MyThread implements Callable<String> {

@Override
public String call(){
return "Hello";
}
}





常用辅助类


CountDownLatch

  • 减法计数器

  • 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。

CountDownLatch countDownLatch = new CountDownLatch(9);	//倒计时总数 9 , 必须要执行任务的时候,再使用

countDownLatch.countDown(); //计数器-1操作

countDownLatch.await(); //等待计数器归0,再向下执行
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//倒计时总数 9 , 必须要执行任务的时候,再使用
CountDownLatch countDownLatch = new CountDownLatch(9);
for (int i = 0; i < 9; i++) {
new Thread(()->{
countDownLatch.countDown(); //-1
},String.valueOf(i)).start();
}
countDownLatch.await(); //等待计数器归0,再向下执行
}
}




CyclicBarrier

  • 加法计数器
  • 允许一组线程全部等待彼此达到共同屏障点的同步辅助
  • 栅栏,等待指定线程数量执行完毕在等待的时候一起通过
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {Runable接口,全部执行完毕执行这里的业务});

cyclicBarrier.await(); //等待(+1)
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("线程达到预定数量");
});

for (int i = 0; i < 6; i++) {
final int temp=i; //线程访问局部变量要设置中间变量且为final
new Thread(()->{
cyclicBarrier.await(); //等待(+1)
},String.valueOf(i)).start();
}
}
}




Semaphore

  • 自定义资源数量,多线程争取同一资源,设置各线程的拥有和释放资源
  • 一个计数信号量。 在概念上,信号量维持一组许可证。
  • 作用:多个共享资源互斥的使用,并发限流,控制最大的线程数
Semaphore semaphore = new Semaphore(3);	//信号量数量
semaphore.acquire(); //获得,如果已经满了,等待,等待到其他线程释放为止
semaphore.release(); //释放,将当前信号量释放 +1,唤醒等待的线程
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
semaphore.acquire(); //获得,如果已经满了,等待,等待到其他线程释放为止
System.out.println(Thread.currentThread().getName()+"得到停车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开停车位");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
semaphore.release(); //释放
}
},String.valueOf(i)).start();
}
}
}





读写锁

  • ReadWriteLock维护一对关联的locks ,一个用于只读操作,一个用于写入。 read lock可以由多个阅读器线程同时进行,只要没有作者。 write lock是独家的。

独占锁(写锁)一次只能被一个线程占有

共享锁(读锁)多个线程可以同时占有

ReadWriteLock

private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //细粒度的读写锁

lock.writeLock().lock(); //写锁加锁
lock.writeLock().unlock(); //写锁解锁

lock.readLock().lock(); //读锁加锁
lock.readLock().unlock(); //读硕解锁

读-读:可以共存

读-写:不能共存!

写-写:不能共存

public class ReadWriteLockTest {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 10; i++) {
final int temp = i;
new Thread(()->{
myCache.write(temp+"",temp+"");
myCache.read(temp+"");
},String.valueOf(i)).start();
}
}
}

class MyCache{
private volatile Map<String, String> map = new HashMap<>(); //缓存
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();//细粒度锁:读写锁

public void write(String key,String value){
lock.writeLock().lock(); //写锁加锁
try {
map.put(key,value);
} catch (Exception e) { throw new RuntimeException(e);}
finally {
lock.writeLock().unlock(); //写锁解锁
}
}

public void read(String key){
lock.readLock().lock(); //读锁加锁
try { String objects = map.get(key);}
catch (Exception e) { throw new RuntimeException(e);}
finally { lock.readLock().unlock(); //读硕解锁
}
}
}





阻塞队列

  • ArrayBlockingQueue

  • 使用场景:多线程并发处理,线程池


image-20230926171217780



//队列大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(1);

blockingQueue.add("c"); //出现异常即停止
blockingQueue.offer("c"); //有返回值不报异常
blockingQueue.put("c"); //阻塞等待,一直阻塞等待
blockingQueue.offer("c",2, TimeUnit.SECONDS); //超时等待

blockingQueue.remove(); //出现异常即停止
blockingQueue.poll(); //有返回值不报异常
blockingQueue.take(); //阻塞等待,一直阻塞等待
blockingQueue.poll(2, TimeUnit.SECONDS); //超时等待
方式 抛出异常 有返回值 阻塞等待 超时等待
添加 add offer put offer
移除(弹出) remove poll take poll
检测队首元素 element peek - -

总结:

当定义了指定大小的队列

添加

  • .add(value):超出队列抛出异常
  • .offer(value):超出队列返回false
  • .put(value):超出队列,一直阻塞等待
  • .offer(value,时间,时间单位):超出队列,阻塞等待指定时间

删除

  • remove:对于空队列使用弹出会抛出异常
  • poll:对于空队列使用弹出为null
  • take:对于没有这个元素,一直阻塞
  • poll:对于没有这个元素,阻塞等待指定时间


同步队列

SynchronousQueue

  • 容量为1

  • 进一,必需等待取出,才能继续进

  • put 了元素必须 take 取出来,否则不能继续put

public class Queue1 {
public static void main(String[] args) {
//队列大小
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

new Thread(()->{
blockingQueue.put("1");
blockingQueue.put("2");
},"T1").start();

new Thread(()->{
TimeUnit.SECONDS.sleep(1);
blockingQueue.take();
TimeUnit.SECONDS.sleep(1);
blockingQueue.take();
},"T2").start();
}
}






线程池

image-20230926211652152


三大方法

Executors

1、newSingleThreadExecutor – 容量 1

2、newFixedThreadPool(sum) – 自定义容量

3、newCachedThreadPool – 动态容量

Executors.newSingleThreadExecutor();    //线程池容量 1
ExecutorService threadPool = Executors.newFixedThreadPool(5); //固定大小线程池
Executors.newCachedThreadPool(); //动态大小线程池

public static void main(String[] args) {

//ExecutorService threadPool = Executors.newSingleThreadExecutor(); //线程池容量 1
ExecutorService threadPool = Executors.newFixedThreadPool(50); //固定大小线程池
//ExecutorService threadPool = Executors.newCachedThreadPool(); //动态大小线程池
try {
threadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 30; i++) {
threadPool.execute(()->{ //放入线程池
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
threadPool.shutdown(); //关闭线程池
}
}




7大参数

三大方法的源码

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

//均是new ThreadPoolExecutor

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,	//核心线程池大小
int maximumPoolSize, //最大核心线程池大小
long keepAliveTime, //空闲线程存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //阻塞队列
ThreadFactory threadFactory, //线程工厂,创建线程
RejectedExecutionHandler handler //拒绝策略
)
{}

创建的线程池大小默认是核心线程池的大小,当阻塞队列满了,每当队列溢出一个则线程池大小+1,直达到最大核心线程池的大小,在达到核心线程池大小和阻塞队列大小的时候触发拒绝策略

public static void main(String[] args) {
//自定义线程池 工作中使用的是 ThreadPoolExecutor
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() //当前策略当超出最大承载抛出异常
);
try {
//最大承载:workQueue + maximumPoolSize 超出最大承载抛出异常
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(()->{
});
}
} catch (Exception e) {
} finally {
threadPoolExecutor.shutdown();
}
}




四大拒绝策略

new ThreadPoolExecutor.AbortPolicy()  //当前策略当超出最大承载抛出异常
new ThreadPoolExecutor.CallerRunsPolicy() //拒绝进入线程池,那个方法创建的线程哪个方法(main)自己运行
new ThreadPoolExecutor.DiscardPolicy() //队列满了,不会抛出异常,丢掉拒绝的线程
new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,把最前面阻塞队列的任务丢掉,并执行当前任务


小结

  • 线程池的最大线程如何定义:

    ①CPU密集型 :核心数:Runtime.getRuntime().availableProcessors()

    ②IO密集型 : 程序十分耗IO的线程






四大函数式接口

函数式接口:只有一个方法的接口(接口中的方法只能是抽象静态的)

  • 实际表达式里面就是接口里面的方法,把名字省略了,但接口实实在在定义了,所以实现的时候不用名字,但调用的时候是调用名字
//例如  Runnable接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}


函数型接口

  • 有一个输入参数,一个输出
public interface Function<T, R> {
R apply(T t);
}
  • Function可指任何类
// ================例子

//原写法
Function function = new Function<String, String>() {
@Override
public String apply(String s) {
return s;
}
};
function.apply("hello")


//使用lambda
Function<String, String> stringStringFunction =(s) -> {return s;};
Function<String, String> stringStringFunction = s-> s ; //简化
stringStringFunction.apply("world");


断定型接口

  • 一个输入参数,返回值只能是布尔值
public interface Predicate<T> {
boolean test(T t);
}
  • 例如 Predicate
Predicate<String> predicate = new Predicate<String>() {
@Override
public boolean test(String s) {
return s.isEmpty();
}
};
System.out.println(predicate.test(""));

//使用lambda
Predicate<String> predicatec = (String s) ->{return s.isEmpty();};
Predicate<String> predicatec = s -> s.isEmpty();
System.out.println(predicatec.test("c"));


消费型接口

  • 只有输入,无返回值
public interface Consumer<T> {
void accept(T t);
}
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
consumer.accept("consumer");

//使用lambda
Consumer<String> stringConsumer = (String s) -> {System.out.println(s);};
Consumer<String> stringConsumer = s -> { System.out.println(s);};
stringConsumer.accept("world");


供给型接口

  • 没有参数,只有返回值
@FunctionalInterface
public interface Supplier<T> {
T get();
}
Supplier supplier= new Supplier<String>() {
@Override
public String get() {
return "hello";
}
};
System.out.println(supplier.get());

Supplier supplierc = ()->{return "world";};
System.out.println(supplierc.get());





Stream流

程序存储与计算,计算交给 流 来操作

获取Stream流

  • 转换为流 xx**.stream()**
//////获取stream流

//list(单列集合)
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list,"a","b","c","d","e");
list.stream().forEach(s -> System.out.println(s));

//map(双列集合)
HashMap<String, Integer> map = new HashMap<>();
map.put("aa",11);
map.put("bb",22);
//1、使用keySet获取全部key
map.keySet().stream().forEach(s -> System.out.println(s));
//2、使用entrySet获取全部键值对 == 形式
map.entrySet().stream().forEach(stringIntegerEntry -> System.out.println(stringIntegerEntry));

//数组
int arr[] = {1,2,3,4,5,6,7,8,9};
Arrays.stream(arr).forEach(s -> System.out.println(s));

//零散数据类型
Stream.of("a","b","c","d","e").forEach(s -> System.out.println(s));

public static void main(String[] args) {

User u1 = new User(1,"a",10);
User u2 = new User(2,"b",11);
User u3 = new User(3,"c",12);
User u4 = new User(4,"d",13);
User u5 = new User(5,"e",14);
User u6 = new User(6,"f",15);
User u7 = new User(7,"g",16);

List<User> list = Arrays.asList(u1, u2, u3, u4, u5, u6, u7);

list.stream() //转为流 + 链式编程
.filter((User user) -> {return user.getId()%2==0;}) //id为偶数
.filter(user -> {return user.getAge() > 11;}) //age>11
.map(user -> {return user.getName().toUpperCase();}) //name转为大写
.sorted((U1,U2)->{return U2.compareTo(U1);}) //倒序
.limit(1) //输出一条记录
.forEach(System.out::println);
}


中间方法

  • 将流合成到对应的数据 .collect(Collectors.toList());

image-20231006210502809


filter

  • Stream流只能使用一次
//list(单列集合)
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list,"aa","bb","cc","cd","ce");
//如果返回值为true,表示当前数据留下
//如果返回值为fa1se,表示当前数据舍弃不要
list.stream().filter(new Predicate<String>() {
@Override
//90 122-25=97
public boolean test(String s) {
return s.startsWith("a") ;
}
}).forEach(s -> System.out.println(s));


//过滤
list.stream().filter(s -> {return s.startsWith("a") ;})
.forEach(s -> System.out.println(s));

limit

//获取前n个数据
list.stream().limit(3).forEach(s -> {
System.out.println(s);
});

skip

//跳过前n个数据
list.stream().skip(3).forEach(s -> System.out.println(s));

concat

Stream.concat(stringStream,stream)

map

ArrayList<String> list = new ArrayList<>();
Collections.addAll(list,"aa-1","bb-2","cc-3","cd-4","ce-5");
list.stream().map(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
//0-为-前数据 1为0后数据
String[] split = s.split("-");
int age = Integer.parseInt(split[1]);
return age;
}
}).forEach(System.out::println);

list.stream().map( s -> Integer.parseInt(s.split("-")[1])).forEach(System.out::println);



//转为对象
Stream.concat(stringStream,stream).map(new Function<String, Actor>() {
@Override
public Actor apply(String s) {
String name = s.split(",")[0];
int age = Integer.parseInt(s.split(",")[1]);
return new Actor(name,age);
}
}).forEach(System.out::println);

Stream.concat(stringStream,stream).map(s -> new Actor(s.split(",")[0],Integer.parseInt(s.split(",")[1]))).forEach(System.out::println);


总结方法

image-20231006213829628

//toMap  ==  键规则,值规则
Map<String, String> collect = list.stream()
.collect(Collectors.toMap(
s -> s.split("-")[0],
s -> s.split("-")[1]
));
System.out.println(collect);

//将字符串转为数组并转换类型和排序
String[] split = stringBuilder.toString().split("-");
Integer[] integers = Arrays.stream(split).map(Integer::parseInt).sorted().toArray(Integer[]::new);

并行流计算

parallel()

LongStream 是一个用于处理长整数的流。

.parallel():这个方法调用将 LongStream 转换为一个并行流(parallel stream)。并行流允许同时在多个处理器核心上进行操作,以提高处理速度。

.reduce(0, Long::sum):这一部分使用 reduce 操作来计算流中所有元素的和。reduce 接受两个参数:初始值和一个用于累积操作的二元运算符。

public static void test3() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();

//并行流 -- 范围计算
long sum = LongStream.range(1L, 10_0000_0000L).parallel().reduce(0, Long::sum);

long end = System.currentTimeMillis();
System.out.println("sum = "+sum+" 执行时间 = "+(end-start));
}





ForkJoin

  • 分支合并

image-20230927010653829


ForkJoin特点:工作窃取

双端队列

例如:B已经执行完,而A没有执行完,B会窃取A的任务来执行



image-20230927110056007

1、forkjoinPool通过它来执行l

2、计算任务forkjoinPool.execute(ForkJoinTask task)


image-20230927110459015

1、选择递归任务RecursiveTask:继承RecursiveTask,实现compute方法

public class ForkjoinTest extends RecursiveTask<Long> {

private Long start;
private Long end;
//临界值
private Long temp = 10000L;

public ForkjoinTest(Long start, Long end) {
this.start = start;
this.end = end;
}

//计算方法
@Override
protected Long compute() {
Long sum = 0L;

if ((end-start)<temp){
for (Long i = start; i <end; i++) {
sum+=i;
}
return sum;

}else { //forkjoin //递归思想,无限拆分任务

long middle = (start + end)/2; //中间值
ForkjoinTest task1 = new ForkjoinTest(start, middle);
task1.fork(); //拆分任务,把任务压入线程队列
ForkjoinTest task2 = new ForkjoinTest(middle + 1, end);
task2.fork(); //拆分任务,把任务压入线程队列
return task1.join() + task2.join();
}
}
}


2、先定义ForkJoinPool,使用submsit提交任务,再通过get获得运行结果

public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();

//使用forkjoin
ForkJoinPool forkJoinPool = new ForkJoinPool();
//计算任务
ForkJoinTask<Long> task = new ForkjoinTest(1L,10_0000_0000L);
//提交任务
ForkJoinTask<Long> submsit = forkJoinPool.submit(task);
//获取结果
Long sum = submit.get();

long end = System.currentTimeMillis();






异步回调

  • Future

异步执行

成功回调

失败回调

public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

//没有返回值的 runAsync 异步回调
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("异步调用");
});
completableFuture.get(); //获取阻塞执行的结果


//有返回值的异步回调
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+"supplyAsync=>completableFuture");
int s =1/0;
return 1024;
});

System.out.println("流程线程1");
completableFuture.whenComplete((t,u)->{
System.out.println("t=>"+t); //t:正常返回结果
System.out.println("u=>"+u); //u:错误信息
}).exceptionally((throwable -> {
throwable.printStackTrace();
return 404;
})).get();

System.out.println("流程线程2");
}
}






JMM

JMM:Java内存模型,不存在的东西,概念!约定!

关于JMM的一些同步的约定:

1、线程解锁前,必须把共享变量立刻刷回主存。

2、线程加锁前,必须读取主存中的最新值到工作内存中!

3、加锁和解锁是同一把锁

线程、工作内存、主内存

8种操作:(读、加载)(使用、返回)(存储、写)(加锁、解锁)

image-20230927140418597


内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)

  • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态

  • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定

  • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用

  • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中

  • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令

  • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中

  • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用

  • write  (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中



  JMM对这八种指令的使用,制定了如下规则:

  • 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write

  • 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存

  • 不允许一个线程将没有assign的数据从工作内存同步回主内存

  • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作

  • 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁

  • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值

  • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量

  • 对一个变量进行unlock操作之前,必须把此变量同步回主内存






Volatile+原子引用

Volatile是Java虚拟机提供轻量级的同步机制

  • 例如线程错误复现
private static int num = 0;	//不加 volatile 为不可见

public static void main(String[] args) {

new Thread(()->{
while (num == 0){
//线程1 对内存的变化不知道
}
}).start();
TimeUnit.SECONDS.sleep(1);

num = 1; //其他线程更改内存的资源
}


1、保证可见性

  • 例如:线程1先读取主内存的值,线程2后读,但线程2修改资源并写入了主内存,线程1不知道,如果以这个值为运行条件则一直运行下去
// 内存中的值使用 volatile 修饰实现  可见性

private volatile static int num = 0;


2、不保证原子性

// 不冲突的情况下是 结果必为 20000,当多个线程同时操作,结果<20000
private volatile static int num = 0; //加上volatile也不保证原子性

public static void add(){ num++; } // 不是一个原子性操作

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int i1 = 0; i1 < 1000; i1++) { add();}
}).start();
}
//判断存活线程
while (Thread.activeCount()>2){
Thread.yield();
}
}

如果不加Iock和synchronized,怎么样保证原子性?

  • 使用原子类中的包装类,解决原子性问题(synchronized、Iock外的方案)

image-20230927145730191

private static AtomicInteger num = new AtomicInteger();

public static void add(){
num.getAndIncrement(); //AtomicInteger + 1 方法 CAS
//num++; // 不是一个原子性操作
}
  • 原因:CAS


3、禁止指令重排

  • 使用 volatile,由于内存屏障,可以保证避免指令重排的现象产生
  • 程序中的业务不是按顺序执行的,编译的时候业务程序可能会发生重排

    image-20230927152416545






单例模式

使用单例模式来创建实例,使用 **getInstance** 方法来获取实例,这样可以确保只有一个实例被创建

  1. 懒汉式

    • 创建时机:懒汉式单例在第一次使用时才会被创建,也就是说,它是延迟初始化的。即实例在需要的时候才被创建。
    • 线程安全性:最简单的懒汉式实现可能不是线程安全的,多个线程可以同时检测到实例为 null,然后尝试创建多个实例。为了确保线程安全,需要引入同步机制(如双重检查锁定)来防止多次实例化。
    • 懒加载:懒汉式允许懒加载,即只有在需要的时候才会创建实例,这可以节省资源,特别是在实例较大或初始化耗时较长的情况下。

  2. 饿汉式

    • 创建时机:饿汉式单例在类加载时就会被创建,无论是否实际使用。即实例在类加载时就被创建好。
    • 线程安全性:饿汉式单例天生线程安全,因为实例在类加载时就已经创建,不需要担心多线程并发创建多个实例的问题。
    • 不懒加载:饿汉式不支持懒加载,因为实例在类加载时就已经存在。这可能会浪费一些内存,特别是在实例很大或初始化很复杂的情况下。


饿汉式

  • 一上来就全部加载资源,无论是否使用,实例在类加载时就创建了,会造成资源浪费
  • 饿汉式单例的优点是它在多线程环境下是线程安全的,因为实例在类加载时就创建了,不需要考虑线程同步问题。
//饿汉式单例
//一上来就全部加载
public class Hungry {

//构造器私有
//防止外部代码通过构造新实例来绕过单例的限制。构造器私有化确保只有单例类内部可以创建实例。
private Hungry(){}

private final static Hungry HUNGRY = new Hungry();

//getInstance 获取单例类的实例
public static Hungry getInstance(){
return HUNGRY;
}
}


懒汉式

image-20230927185312053

1–1:可破坏性的单例模式①:多个线程可以同时检测到实例为 null,然后尝试创建多个实例

public class LazyMan {
private LazyMan(){ }
private volatile static LazyMan lazyMan;
public static LazyMan getInstance(){
if (lazyMan==null){
lazyMan=new LazyMan();
}
return lazyMan;
}

public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(()->{
LazyMan.getInstance();
}).start();
}
}
}

1-2:解决多线程破坏单例:引入同步机制(如双重检查锁定)来防止多次实例化

public static LazyMan getInstance(){
if (lazyMan==null){
synchronized (LazyMan.class){
if (lazyMan==null){
lazyMan=new LazyMan();
}
}
}
return lazyMan;
}


2–1:可破坏性的单例模式②:尽管实现了引入同步机制(如双重检查锁定)来防止多次实例化,还是可以通过反射来破坏,创建多个实例破坏单例模式

LazyMan lazyMan1 = new LazyMan();
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);//反射空参构造
declaredConstructor.setAccessible(true); //关闭安全检测--无视私有
LazyMan lazyMan2 = declaredConstructor.newInstance();

2-2:解决额外反射创建实例破坏单例:局限于先使用单例模式创建实例再使用反射创建实例,如果都是通过反射创建实例,则又被破坏

private LazyMan(){
synchronized (LazyMan.class){
if (lazyMan!=null){
throw new RuntimeException("禁止使用反射破坏单例");
}
}
}

2-3:解决全用反射创建实例破坏单例:定义变量标志位控制判断(红绿灯)

private static boolean chen = false;	//隐藏标志位
private LazyMan(){
synchronized (LazyMan.class){
if (chen == false){
chen = true;
}else{
throw new RuntimeException("禁止使用反射破坏单例");
}
}
}

2-4:反射破坏隐藏变量再进行破坏单例:破解标志位

Field chen1 = LazyMan.class.getDeclaredField("chen");	//根据知道的标志位去获取
chen1.setAccessible(true); //去取标志位的所有属性检测

Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);//空参构造
declaredConstructor.setAccessible(true); //关闭构造器安全检测--无视私有
LazyMan lazyMan1 = declaredConstructor.newInstance(); //反射创建
chen1.set(lazyMan1,false); //创建完让标志位再初始化

LazyMan lazyMan2 = declaredConstructor.newInstance(); //继续反射创建


最佳单例:枚举

在枚举实现的单例模式中,单例实例通常是一个枚举的常量,例如 INSTANCE,而不需要显式调用 getInstance 方法来获取实例。这是因为枚举的特性保证了在类加载时,常量会被初始化,从而实现了单例。

public enum EnumTest {
INSTANCE;

public EnumTest getInstance(){
return INSTANCE;
}

public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
EnumTest instance = EnumTest.INSTANCE;

//反射报错
Constructor<EnumTest> declaredConstructor = EnumTest.class.getDeclaredConstructor(String.class,int.class);
declaredConstructor.setAccessible(true);
EnumTest enumTest = declaredConstructor.newInstance();
}
}





CAS

CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环!

类似乐观锁

Unsafe:调用底层

AtomicInteger atomicInteger = new AtomicInteger(2023);

//期望和更新,到达期望值即更新
//如果我期望的值达到了,那么就更新,否则,就不更新,CAS是CPU的并发原语
atomicInteger.compareAndSet(2023, 999);
System.out.println(atomicInteger.get());

缺点:

1、循环会耗时

2、一次性只能保证一个共享变量的原子性

3、ABA问题

image-20230927193452037

image-20230927195142926

image-20230927195227984



ABA问题

aba不解决资源会被提前挪用,这不是我们希望的

  • A期望是1
  • 在A进入未修改时,速度快,B也期望是1,改为了3,又期望是3,改为了1
  • A以为还是1,没有更改过,然后更改为2
  • 虽然A的期望值最后没有变化,但中间的时候已经发生了变化

image-20230927202217104

AtomicInteger atomicInteger = new AtomicInteger(2023);

//===========================捣乱B线程
atomicInteger.compareAndSet(2023, 2020);
atomicInteger.compareAndSet(2020, 2023);

//===========================期望A线程
atomicInteger.compareAndSet(2023, 999);


原子引用解决

解决:和乐观锁原理相同,判断版本号

使用原子引用解决

image-20230927210105622

public class CAS_Demo {
public static void main(String[] args) {
//AtomicInteger atomicInteger = new AtomicInteger(2023);
//原始值 , 版本号
//AtomicStampedReference注意:如果泛型是一个包装类,注意对象的引用问题(如Integer的范围,所以原始值和版本号不能超过)
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);

//B线程:中途修改
new Thread(()->{
//获得版本号
int integer = atomicStampedReference.getStamp();

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

//期望值,修改值,当前版本号,更新版本号
atomicStampedReference.compareAndSet(1, 2, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
//期望值,修改值,当前版本号,更新版本号
atomicStampedReference.compareAndSet(2, 1, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
},"B").start();


//A
new Thread(()->{
//获得版本号
int integer = atomicStampedReference.getStamp();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}

//期望值,修改值,当前版本号,更新版本号
atomicStampedReference.compareAndSet(1, 9, integer, integer + 1);
},"A").start();

}
}





可重入锁

image-20230927210734195


synchronized:获得sms的锁,但sms也调用call了,对于也获得了call的锁—–可重入锁

public class CRS_1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}

class Phone{
public synchronized void sms(){
call();
}
public synchronized void call(){ }
}


ReentrantLock

  • 上锁获得两把钥匙:解对应的lock.unlock();
  • 锁必须配对,一个锁lock.lock();即要解锁一次lock.unlock();
public class CRS_1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}

class Phone{
Lock lock = new ReentrantLock();
public void sms(){
//锁必须配对,一个锁即要解锁一次lock.unlock();
lock.lock(); //上锁获得两把钥匙
try {
//一把钥匙解锁当前业务lock.unlock();
System.out.println(Thread.currentThread().getName()+"=>"+" sms");
call();//一把钥匙解锁call的lock.unlock();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"=>"+" call");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}





自旋锁

  • spinlock:不断尝试,直到成功

image-20230927195227984


  • 自定义
public class SpinlockDemo {

AtomicReference<Thread> atomicReference = new AtomicReference<>();

//加锁
public void myLock(){
Thread thread = Thread.currentThread();//获取当前线程
System.out.println(Thread.currentThread().getName() + "=====>mylock");

//自旋锁 T1进来是拿到锁,是期望的null,变成thread,这时候线程T2拿到锁后是thraed开始自旋,等待T1解锁变回null,T2才能走出自旋
while (!atomicReference.compareAndSet(null,thread)){

}
}


//解锁
public void myUnLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"=====>myUnlock");
atomicReference.compareAndSet(thread,null);//如果是期望的thread,设置为空
}

}
  • Test
public class SpinLockTest {
public static void main(String[] args) throws InterruptedException {
//java自旋锁
// ReentrantLock reentrantLock = new ReentrantLock();
// reentrantLock.lock();
// reentrantLock.unlock();

//自定义自旋锁,底层CAS
//T1进来是拿到锁,是期望的null,变成thread,这时候线程T2拿到锁后是thraed开始自旋,等待T1解锁变回null,T2才能走出自旋
SpinlockDemo lock = new SpinlockDemo();

new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.myUnLock();
}
},"T1").start();

TimeUnit.SECONDS.sleep(1);

new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.myUnLock();
}
},"T2").start();

}
}





死锁

  • 代码复现
public class DeadLockDemo {
public static void main(String[] args) {
String lockA= "lockA";
String lockB= "lockB";
new Thread(new MyThread(lockA,lockB),"T1").start();
new Thread(new MyThread(lockB,lockA),"T2").start();
}
}

class MyThread implements Runnable{
private String lockA;
private String lockB;

public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}

@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+" lock "+lockA+"=>get "+lockB);

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

synchronized (lockB){
System.out.println(Thread.currentThread().getName()+" lock "+lockB+"=>get"+lockA);

}
}

}
}

解决问题 JPS

1、查看进程号 :jps -l

2、查看进程信息:jstack 进程号






实践

秒杀

image-20231018234547435


超卖问题

  • 1、使用数据库的乐观锁

进入购买,库存 - 1

【判断条件:使用乐观锁,使用版本号思想,在更新库存的时候判断查询时候的版本号是否一致,防止查询后修改前有事务进入修改,如一致 == 库存 -1,生成订单号】

CAS】:类似判断版本号:既然需要更新前判断和查询的的版本是否一致,直接判断库存是否一致即可:但会导致不一致的事务不成功,则很多事务不成功,卖出少,解决方法:判断库存 > 0】

存在一人多单情况

image-20231018234252865





一人一单

image-20231018234744657

  • 锁住同一id:

每次调用userId.toString都是一个全新的对象,但使用intern后:直接去字符串常量池寻找值一样的地址,如果每次的值都一样,则返回的地址一样

image-20231018235824230

Long userId = ....getId()
synchronized (userId.toString().intern()){
//查询订单

//是否存在

//扣减库存

//生成订单
}

image-20231019000216584

  • 悲观锁:要先获取锁,提交事务,再释放锁 才能线程安全