当前位置: 首页 > news >正文

华星建设集团网站宁波网站推广哪家公司好

华星建设集团网站,宁波网站推广哪家公司好,网络客服怎么样,wordpress背景图片下载目录 主要特点源码解读(1)初始化(2)创建工作线程并启动(3)线程调度和执行任务主要流程任务封装 ScheduledFutureTask延时工作队列 DelayedWorkQueue初始化添加任务 offer获取任务 poll / take 小顶堆是什么…

目录

  • 主要特点
  • 源码解读
    • (1)初始化
    • (2)创建工作线程并启动
    • (3)线程调度和执行任务
        • 主要流程
        • 任务封装 `ScheduledFutureTask`
        • 延时工作队列 DelayedWorkQueue
          • 初始化
          • 添加任务 offer
          • 获取任务 poll / take
  • 小顶堆是什么?
    • 概念
    • 堆的典型操作
    • 常用计算公式
    • 典型应用场景

ScheduledThreadPoolExecutor 支持以下任务,

(1)延迟执行;

(2)固定速率(scheduleAtFixedRate);

(3)固定延迟(scheduleWithFixedDelay);

主要特点

(1)基于 ThreadPoolExecutor 扩展,复用线程资源;

(2)工作队列使用 延迟队列**(DelayedWorkQueue)**:基于二叉堆(最小堆)的无界优先级队列,按任务到期时间排序;

(2)任务封装**(ScheduledFutureTask)**:

封装 RunnableCallable,记录下次执行时间(time)、周期(period)等。

重写 run() 方法,任务重新入队,实现周期性任务的重调度逻辑**;**

本文将对源码进行解读,指导 ScheduledThreadPoolExecutor 线程池的使用,

由于 ScheduledThreadPoolExecutor 是基于 ThreadPoolExecutor 线程池扩展实现的,核心线程调度和执行逻辑依然由 ThreadPoolExecutor 实现,需要先了解 ThreadPoolExecutor 的运行机制:Java线程池底层是怎么创建和运行的?(源码阅读)

源码解读

(1)初始化

这里使用 Executors 创建 ScheduledThreadPoolExecutor 线程池,指定 核心线程数 corePoolSize

private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);// java.util.concurrent.Executors
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}

进入构造函数,注意此处最大线程数默认为 Integer.MAX_VALUE,该参数实际无作用。

因为工作队列使用的是 DelayedWorkQueue 无界队列,是一个基于二叉堆结构的****优先级队列,用于管理定时任务,可以自动扩容不会被填满,因此线程数会始终保持 corePoolSizekeepAliveTime 也默认设置为0即可。

// ScheduledThreadPoolExecutor.class// 构造函数
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,// 使用的工作队列是内部类 ScheduledThreadPoolExecutor.DelayedWorkQueuenew DelayedWorkQueue());
}// 实际是进入父类 java.util.concurrent.ThreadPoolExecutor 构造器,进行线程池初始化
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}

(2)创建工作线程并启动

execute/submit 提交任务,主要流程:

(1)将任务ScheduledFutureTask加入工作队列 DelayedWorkQueue 中;

(2)若工作线程数小于 corePoolSize ,创建新工作线程并启动;(ensurePrestart方法)

// 重写了execute方法, 实际调用 schedule 方法
public void execute(Runnable command) {schedule(command, 0, NANOSECONDS);
}// 重写了submit方法, 实际调用 schedule 方法
public <T> Future<T> submit(Callable<T> task) {return schedule(task, 0, NANOSECONDS);
}// 封装 Runnable 任务并提交工作队列
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();// Runnable 包装为 ScheduledThreadPoolExecutor.ScheduledFutureTask 任务,无返回值RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));// 提交延时任务delayedExecute(t);return t;
}// 封装 Callable 任务并提交工作队列
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {if (callable == null || unit == null)throw new NullPointerException();// Callable 包装为 ScheduledThreadPoolExecutor.ScheduledFutureTask 任务,带返回值RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));// 提交延时任务delayedExecute(t);return t;
}private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())// 若线程池已关闭, 执行拒绝策略 - 父类 ThreadPoolExecutor 的方法reject(task);else {// 将任务加入工作队列super.getQueue().add(task);// ..if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// 小于核心线程数,创建新的工作线程ensurePrestart();}
}

父类 java.util.concurrent.ThreadPoolExecutor 中用到的方法,

// 根据ctl判断非运行状态
public boolean isShutdown() {return ! isRunning(ctl.get());
}// 执行拒绝策略
final void reject(Runnable command) {handler.rejectedExecution(command, this);
}// 小于核心线程数,新增工作线程
void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);else if (wc == 0)addWorker(null, false);
}

(3)线程调度和执行任务

主要流程

(1)工作线程仍然是调用 ThreadPoolExecutor.runWorker 方法实现任务调度执行,详见 Java线程 ThreadPoolExecutor 源码部分 -(3);

(2)区别在于工作队列为 DelayedWorkQueue ,获取任务需要达到 执行时间 ScheduledFutureTask.time;

(3)任务ScheduledFutureTask若为周期性任务,执行完后,通过运行周期ScheduledFutureTask.period, 计算并设置下次执行时间ScheduledFutureTask.time并重新加入工作队列,实现周期性执行。

任务封装 ScheduledFutureTask

首先看看,内部任务类 ScheduledThreadPoolExecutor.ScheduledFutureTask

用于封装需要定时或周期性执行的任务。它实现了 RunnableScheduledFuture 接口,结合了 Runnable(可执行任务)和 Future(异步计算结果)的特性,同时支持任务调度逻辑。

// 内部类 ScheduledFutureTask,表示定时任务
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {// 触发时间 nanos(纳秒)private long time;// 任务提交时的序号,用于在多个任务具有相同 time 时定义执行顺序(先进先出),在下面CompareTo方法中有用private final long sequenceNumber;// 任务的周期时间(纳秒)private final long period;/** The actual task to be re-enqueued by reExecutePeriodic */RunnableScheduledFuture<V> outerTask = this;// 任务在 DelayedWorkQueue 堆数组中的下标位置,后续进/出 工作队列时会更新int heapIndex;// 构造器,上面schedule()方法中,包装Runnable/Callable时使用到了ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}// run() 实现public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)// (1)非周期一次性任务:直接执行ScheduledFutureTask.super.run();// (2)周期性任务:执行任务并重置任务状态,设置下次执行时间并重新加入工作队列else if (ScheduledFutureTask.super.runAndReset()) {// 设置下次执行的时间setNextRunTime();// 任务重新加入工作队列reExecutePeriodic(outerTask);}}/*** 是否是周期性任务* Returns {@code true} if this is a periodic (not a one-shot) action.*/public boolean isPeriodic() {return period != 0;}/*** 设置下次执行时间*/private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}// 实现 Delayed 接口, 返回当前任务剩余延迟时长public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}// 实现任务优先级比较方法://     (1)首先按 time 排序(执行时间早的任务优先)。//     (2)如果 time 相同,则按 sequenceNumber 排序(先提交的任务优先)public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}
}
延时工作队列 DelayedWorkQueue
初始化

数据结构:基于数组的小顶堆。(小顶堆 知识点温习 见文末)

通过最小堆****管理任务顺序,使得任务按照执行时间先后执行,插入/删除的 ***O(***log n) 复杂度 保证了高吞吐场景下的性能;

static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {// 数组存储,初始长度 16,表示一个 “基于数组的最小堆”private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private final ReentrantLock lock = new ReentrantLock();private int size = 0;private Thread leader = null;// 当队列为空时,工作线程通过 available.await() 等待;// 添加新任务时,通过 available.signal() 唤醒等待线程private final Condition available = lock.newCondition();...
}
添加任务 offer
/*** 添加任务 (核心)**/public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;lock.lock();try {int i = size;// 容量不足, 扩容 50%if (i >= queue.length)grow();size = i + 1;if (i == 0) {// 首个元素直接加入queue[0] = e;setIndex(e, 0);} else {// 根据 ScheduledFutureTask.compareTo 大小关系构造小顶堆siftUp(i, e);}// 若更新了堆顶元素 queue[0] ,唤醒 available 上所有等待执行任务的工作线程Workerif (queue[0] == e) {leader = null;available.signal();}} finally {lock.unlock();}return true;}public boolean add(Runnable e) {return offer(e);}public boolean offer(Runnable e, long timeout, TimeUnit unit) {return offer(e);}/*** 扩容 50%*/private void grow() {int oldCapacity = queue.length;int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%if (newCapacity < 0) // overflownewCapacity = Integer.MAX_VALUE;queue = Arrays.copyOf(queue, newCapacity);}/*** 更新 ScheduledFutureTask.heapIndex , 存储在堆数组的下标*/private void setIndex(RunnableScheduledFuture<?> f, int idx) {if (f instanceof ScheduledFutureTask)((ScheduledFutureTask)f).heapIndex = idx;}/*** 根据任务的CompareTo优先级关系,构造小顶堆。即优先执行的任务越靠前。*/private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {// 获取父节点下标 = (k - 1) / 2int parent = (k - 1) >>> 1;RunnableScheduledFuture<?> e = queue[parent];// 若大于父节点,符合小顶堆特点,直接返回if (key.compareTo(e) >= 0)break;// 若小于父节点,替换父节点,继续构造小顶堆queue[k] = e;setIndex(e, k);k = parent;} // 设置下标位置到 ScheduledFutureTask 任务的 heapIndex 中 queue[k] = key;setIndex(key, k);}
获取任务 poll / take

poll(long timeout, TimeUnit unit) 非阻塞获取任务,允许超时;

    /*** 限时 获取堆顶元素**/public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)throws InterruptedException {// 等待超时时间 nanoslong nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {// 获取堆顶任务 queue[0]RunnableScheduledFuture<?> first = queue[0];// 当前没有任务,阻塞等待超时时间 - available 条件变量阻塞if (first == null) {if (nanos <= 0)return null;elsenanos = available.awaitNanos(nanos);} else {// 当前有任务long delay = first.getDelay(NANOSECONDS);// 任务到达执行时间 time,finishPoll 取出堆顶任务后调整堆结构if (delay <= 0)return finishPoll(first);// 任务还未到执行时间time,    if (nanos <= 0)return null;first = null; // don't retain ref while waiting// 超时时间不足以等待任务剩余延迟时间 delay// 或者有其他工作线程 leader 在等待执行堆顶任务// 直接等待超时时间结束if (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);else {// 当前工作线程设置为leader,阻塞等待剩余延迟时间 delay,准备执行任务Thread thisThread = Thread.currentThread();leader = thisThread;try {long timeLeft = available.awaitNanos(delay);nanos -= delay - timeLeft;} finally {// 等待结束,移除 leader 身份标记if (leader == thisThread)leader = null;}}}}} finally {// 堆顶任务不为空,且没有 leader 工作线程在等待执行,唤醒所有工作线程if (leader == null && queue[0] != null)available.signal();lock.unlock();}}/*** Performs common bookkeeping for poll and take: Replaces* first element with last and sifts it down.  Call only when* holding lock.* @param f the task to remove and return*/private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {// 更新 size - 1int s = --size;// 拿出最后一个元素 x(优先级最低-即执行时间time最晚的任务)RunnableScheduledFuture<?> x = queue[s];// 将最后一个元素的位置置空queue[s] = null;// 剩余元素数量不为0,重新调整堆if (s != 0)siftDown(0, x);setIndex(f, -1);return f;}// 取出堆顶任务后,重新调整小顶堆private void siftDown(int k, RunnableScheduledFuture<?> key) {int half = size >>> 1;while (k < half) {// k的左子节点下标 = (k * 2) + 1int child = (k << 1) + 1;RunnableScheduledFuture<?> c = queue[child];// k的右子节点下标 = (k * 2) + 2int right = child + 1;// 取出左右子节点中较小任务if (right < size && c.compareTo(queue[right]) > 0)c = queue[child = right];// 若 待插入任务 key 小于左右子节点,表示找到了该插入的位置,直接退出if (key.compareTo(c) <= 0)break;// 否则将较小子节点移动到父节点,继续往下查找queue[k] = c;setIndex(c, k);k = child;}// 设置下标位置到 ScheduledFutureTask 任务的 heapIndex 中 queue[k] = key;setIndex(key, k);}// 非等待获取堆顶任务public RunnableScheduledFuture<?> poll() {final ReentrantLock lock = this.lock;lock.lock();try {RunnableScheduledFuture<?> first = queue[0];if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn finishPoll(first);} finally {lock.unlock();}}

take () 阻塞获取堆顶任务(直到任务到期或线程中断)

    // 阻塞获取堆顶任务(直到任务到期或线程中断)public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)// 没有任务,阻塞等待,直到堆顶 queue[0] 更新了新任务被唤醒available.await();else {// 到达了任务执行时间,直接返回任务long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first);first = null; // don't retain ref while waiting// leader 不为空,证明有其他工作线程在等待执行了,继续阻塞if (leader != null)available.await();else {// 没有leader,当前工作线程作为leader,限时阻塞等待delay时间,准备执行任务Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}

小顶堆是什么?

概念

(1) 什么是堆?堆是一种完全二叉树。

(2) 什么是完全二叉树?完全二叉树表示除最后一层外,其余各层的节点数都达到最大值(即满的)的二叉树。

       1/   \2     3        / \   /    4  5  6            <-- 最后一层可以不满,但必须都靠左。可用数组表示: [123456]1/   \2     3        / \      \  4  5       6      <-- 最后一层非都靠左,不是完全二叉树

完全二叉树的节点按层次顺序紧密排列,适合用数组高效存储和遍历

(3) 什么是最大堆? 最小堆?

  • 最大堆(大顶堆):每个父节点的值 ≥ 其子节点的值(根节点是最大值)。
  • 最小堆**(小顶堆)**:每个父节点的值 ≤ 其子节点的值(根节点是最小值)。
  • 二叉搜索树BST)的区别:
    • BST:左子树节点 < 父节点 < 右子树节点,用于快速查找。
    • :仅保证父节点与子节点的大小关系,不维护左右子树的顺序。堆的用途是快速获取极值(最大值或最小值)。

堆的典型操作

(1)插入(Insert):将新元素放到末尾,通过“上浮”(与父节点比较并交换)调整位置。

(2)删除根节点(Extract-Max/Min):移除根节点,将末尾元素移到根位置,通过“下沉”(与子节点比较并交换)调整。

(3)时间复杂度:插入和删除均为 **O(**log n),获取极值(即根节点)为 O(1)

常用计算公式

(1)高度公式:h=logN + 1,其中 N 是节点总数;

(2)完全二叉树可以用 数组 高效存储(起始下标0):

  • 计算子节点索引:若父节点索引为 i,则左子节点为 2i+1,右子节点为 2i+2;
  • 计算父节点索引:若子节点索引为 k,父节点索引为 (k-1)/2;

典型应用场景

(1)优先队列:任务调度、Dijkstra最短路径算法。

(2)堆排序:通过反复提取极值实现排序(时间复杂度 O(n log n))。

(3)动态极值维护:如实时统计流数据中的Top K元素。

http://www.cadmedia.cn/news/8346.html

相关文章:

  • 怎样下载建设部网站百度官网入口
  • 广告素材网站都有哪些在线培训系统app
  • 企业网站推广的重要性市场营销案例分析及解答
  • 如何策划网络事件营销长沙网站seo收费
  • 驻马店营销型网站建设百度关键词多少钱一个月
  • 江西龙峰建设集团的网站网上营销怎么做
  • 巨人科技网站建设麒麟seo外推软件
  • 湖口网站建设百度app官网下载安装
  • 电影网站建设步骤全网搜索指数
  • 凡科网站建设网站武汉做网络推广的公司
  • 管理培训机构合肥seo排名优化公司
  • 江阴高端网站建设济宁做网站的电话
  • 网站集群建设方案app推广方式有哪些
  • 十堰做网站最专业的公司朋友圈广告
  • 企业网站免费软文台
  • 广东建设信息网查询成绩网页搜索优化seo
  • 注册公司名称用什么名字好镇江网站seo
  • 昆明电商网站开发理发培训专业学校
  • 大连高端网站建设文员短期电脑培训
  • 新华书店的做的数字阅读网站产品推广方案范文
  • 网站源码建站湖南网站营销seo多少费用
  • 宁波疫情防控最新政策武汉百度推广优化
  • 建设网站的公司兴田德润怎么联系鞍山网络推广
  • phpmysql网站开发笔记来几个关键词兄弟们
  • 邢台优化网站排名百度地图推广怎么收费标准
  • 宁夏商擎网站建设合肥百度推广公司哪家好
  • 深圳建工建设集团有限公司杭州seo网络推广
  • 安徽专业网站建设设计百度指数搜索
  • 湘潭做网站价格 d磐石网络关键词歌曲免费听
  • 建设局特种作业网站南京seo关键词优化预订