博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[case19]聊聊eureka的TaskDispatcher
阅读量:6937 次
发布时间:2019-06-27

本文共 21661 字,大约阅读时间需要 72 分钟。

本文主要研究一下eureka的TaskDispatcher

PeerEurekaNode

public class PeerEurekaNode {    public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {        this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS);    }    /* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,                                     HttpReplicationClient replicationClient, EurekaServerConfig config,                                     int batchSize, long maxBatchingDelayMs,                                     long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {        this.registry = registry;        this.targetHost = targetHost;        this.replicationClient = replicationClient;        this.serviceUrl = serviceUrl;        this.config = config;        this.maxProcessingDelayMs = config.getMaxTimeForReplication();        String batcherName = getBatcherName();        ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);        this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(                batcherName,                config.getMaxElementsInPeerReplicationPool(),                batchSize,                config.getMaxThreadsForPeerReplication(),                maxBatchingDelayMs,                serverUnavailableSleepTimeMs,                retrySleepTimeMs,                taskProcessor        );        this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(                targetHost,                config.getMaxElementsInStatusReplicationPool(),                config.getMaxThreadsForStatusReplication(),                maxBatchingDelayMs,                serverUnavailableSleepTimeMs,                retrySleepTimeMs,                taskProcessor        );    }    //......}
  • statusUpdate
/**     * Send the status information of of the ASG represented by the instance.     *     * 

* ASG (Autoscaling group) names are available for instances in AWS and the * ASG information is used for determining if the instance should be * registered as {@link InstanceStatus#DOWN} or {@link InstanceStatus#UP}. * * @param asgName * the asg name if any of this instance. * @param newStatus * the new status of the ASG. */ public void statusUpdate(final String asgName, final ASGStatus newStatus) { long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs; nonBatchingDispatcher.process( asgName, new AsgReplicationTask(targetHost, Action.StatusUpdate, asgName, newStatus) { public EurekaHttpResponse

execute() { return replicationClient.statusUpdate(asgName, newStatus); } }, expiryTime ); }

提交任务到nonBatchingDispatcher
  • cancel
public void cancel(final String appName, final String id) throws Exception {        long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;        batchingDispatcher.process(                taskId("cancel", appName, id),                new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {                    @Override                    public EurekaHttpResponse
execute() { return replicationClient.cancel(appName, id); } @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { logger.warn("{}: missing entry.", getTaskName()); } } }, expiryTime ); }
像cancel等方法是提交到batchingDispatcher

ReplicationTask

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/ReplicationTask.java

/** * Base class for all replication tasks. */abstract class ReplicationTask {    private static final Logger logger = LoggerFactory.getLogger(ReplicationTask.class);    protected final String peerNodeName;    protected final Action action;    ReplicationTask(String peerNodeName, Action action) {        this.peerNodeName = peerNodeName;        this.action = action;    }    public abstract String getTaskName();    public Action getAction() {        return action;    }    public abstract EurekaHttpResponse
execute() throws Throwable; public void handleSuccess() { } public void handleFailure(int statusCode, Object responseEntity) throws Throwable { logger.warn("The replication of task {} failed with response code {}", getTaskName(), statusCode); }}
它是所有replication任务的基类

InstanceReplicationTask

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/InstanceReplicationTask.java

/** * Base {@link ReplicationTask} class for instance related replication requests. * * @author Tomasz Bak */public abstract class InstanceReplicationTask extends ReplicationTask {    /**     * For cancel request there may be no InstanceInfo object available so we need to store app/id pair     * explicitly.     */    private final String appName;    private final String id;    private final InstanceInfo instanceInfo;    private final InstanceStatus overriddenStatus;    private final boolean replicateInstanceInfo;    //......}
跟instance相关的replication任务,peerEurekaNode里头的register、heartbeat、statusUpdate、deleteStatusOverride、cancel用的都是InstanceReplicationTask。其中statusUpdate是提交到nonBatchingDispatcher,其他的都提交到batchingDispatcher

TaskDispatcher

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/util/batcher/TaskDispatcher.java

/** * Task dispatcher takes task from clients, and delegates their execution to a configurable number of workers. * The task can be processed one at a time or in batches. Only non-expired tasks are executed, and if a newer * task with the same id is scheduled for execution, the old one is deleted. Lazy dispatch of work (only on demand) * to workers, guarantees that data are always up to date, and no stale task processing takes place. * 

Task processor

* A client of this component must provide an implementation of {@link TaskProcessor} interface, which will do * the actual work of task processing. This implementation must be thread safe, as it is called concurrently by * multiple threads. *

Execution modes

* To create non batched executor call {@link TaskDispatchers#createNonBatchingTaskDispatcher(String, int, int, long, long, TaskProcessor)} * method. Batched executor is created by {@link TaskDispatchers#createBatchingTaskDispatcher(String, int, int, int, long, long, TaskProcessor)}. * * @author Tomasz Bak */public interface TaskDispatcher
{ void process(ID id, T task, long expiryTime); void shutdown();}
这个TaskDispatcher主要是任务分发,其中最重要的一点是只有没有过期的任务才会执行,然后如果同一个id有更新的任务调度,则旧的那个将会被删除掉。TaskDispatcher分nonBatchingDispatcher以及batchingDispatcher两种。

TaskDispatchers

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/util/batcher/TaskDispatchers.java

public class TaskDispatchers {    public static 
TaskDispatcher
createNonBatchingTaskDispatcher(String id, int maxBufferSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor
taskProcessor) { final AcceptorExecutor
acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, 1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors
taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher
() { @Override public void process(ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } @Override public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; } public static
TaskDispatcher
createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor
taskProcessor) { final AcceptorExecutor
acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors
taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher
() { @Override public void process(ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } @Override public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; }}
提供了两个工厂方法,分别用来创建nonBatchingDispatcher以及batchingDispatcher。前者的AcceptorExecutor的maxBatchingSize为1,TaskExecutors是singleItemExecutors方法创建;后者的AcceptorExecutor的maxBatchingSize由构造器传入设置,默认是250,TaskExecutors是batchExecutors方法创建。

AcceptorExecutor

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/util/batcher/AcceptorExecutor.java

private final BlockingQueue
> acceptorQueue = new LinkedBlockingQueue<>(); private final BlockingDeque
> reprocessQueue = new LinkedBlockingDeque<>(); void process(ID id, T task, long expiryTime) { acceptorQueue.add(new TaskHolder
(id, task, expiryTime)); acceptedTasks++; } void reprocess(List
> holders, ProcessingResult processingResult) { reprocessQueue.addAll(holders); replayedTasks += holders.size(); trafficShaper.registerFailure(processingResult); } void reprocess(TaskHolder
taskHolder, ProcessingResult processingResult) { reprocessQueue.add(taskHolder); replayedTasks++; trafficShaper.registerFailure(processingResult); }
process放入acceptorQueue,reprocess放入reprocessQueue

AcceptorRunner

ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");    this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);    this.acceptorThread.setDaemon(true);    this.acceptorThread.start();    class AcceptorRunner implements Runnable {        @Override        public void run() {            long scheduleTime = 0;            while (!isShutdown.get()) {                try {                    drainInputQueues();                    int totalItems = processingOrder.size();                    long now = System.currentTimeMillis();                    if (scheduleTime < now) {                        scheduleTime = now + trafficShaper.transmissionDelay();                    }                    if (scheduleTime <= now) {                        assignBatchWork();                        assignSingleItemWork();                    }                    // If no worker is requesting data or there is a delay injected by the traffic shaper,                    // sleep for some time to avoid tight loop.                    if (totalItems == processingOrder.size()) {                        Thread.sleep(10);                    }                } catch (InterruptedException ex) {                    // Ignore                } catch (Throwable e) {                    // Safe-guard, so we never exit this loop in an uncontrolled way.                    logger.warn("Discovery AcceptorThread error", e);                }            }        }        //......    }
这里会循环不断地drainInputQueues,然后assignBatchWork、assignSingleItemWork

drainInputQueues

private void drainInputQueues() throws InterruptedException {            do {                drainReprocessQueue();                drainAcceptorQueue();                if (!isShutdown.get()) {                    // If all queues are empty, block for a while on the acceptor queue                    if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {                        TaskHolder
taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS); if (taskHolder != null) { appendTaskHolder(taskHolder); } } } } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty()); }
这里调用了drainReprocessQueue、drainAcceptorQueue
  • drainAcceptorQueue
private void drainAcceptorQueue() {            while (!acceptorQueue.isEmpty()) {                appendTaskHolder(acceptorQueue.poll());            }        }        private void appendTaskHolder(TaskHolder
taskHolder) { if (isFull()) { pendingTasks.remove(processingOrder.poll()); queueOverflows++; } TaskHolder
previousTask = pendingTasks.put(taskHolder.getId(), taskHolder); if (previousTask == null) { processingOrder.add(taskHolder.getId()); } else { overriddenTasks++; } }
把acceptorQueue里头的任务拿出来,放到pendingTasks队列里头
  • drainReprocessQueue
private void drainReprocessQueue() {            long now = System.currentTimeMillis();            while (!reprocessQueue.isEmpty() && !isFull()) {                TaskHolder
taskHolder = reprocessQueue.pollLast(); ID id = taskHolder.getId(); if (taskHolder.getExpiryTime() <= now) { expiredTasks++; } else if (pendingTasks.containsKey(id)) { overriddenTasks++; } else { pendingTasks.put(id, taskHolder); processingOrder.addFirst(id); } } if (isFull()) { queueOverflows += reprocessQueue.size(); reprocessQueue.clear(); } }
把reprocessQueue里头的任务拿出来,如果没有过期而且不是重复id,则放到pendingTasks,并且processingOrder.addFirst(id)

assign work

void assignSingleItemWork() {            if (!processingOrder.isEmpty()) {                if (singleItemWorkRequests.tryAcquire(1)) {                    long now = System.currentTimeMillis();                    while (!processingOrder.isEmpty()) {                        ID id = processingOrder.poll();                        TaskHolder
holder = pendingTasks.remove(id); if (holder.getExpiryTime() > now) { singleItemWorkQueue.add(holder); return; } expiredTasks++; } singleItemWorkRequests.release(); } } } void assignBatchWork() { if (hasEnoughTasksForNextBatch()) { if (batchWorkRequests.tryAcquire(1)) { long now = System.currentTimeMillis(); int len = Math.min(maxBatchingSize, processingOrder.size()); List
> holders = new ArrayList<>(len); while (holders.size() < len && !processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder
holder = pendingTasks.remove(id); if (holder.getExpiryTime() > now) { holders.add(holder); } else { expiredTasks++; } } if (holders.isEmpty()) { batchWorkRequests.release(); } else { batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS); batchWorkQueue.add(holders); } } } }
这里头就是根据优先级把pendingTasks的任务放到singleItemWorkQueue或者batchWorkQueue

WorkerRunnable

abstract static class WorkerRunnable
implements Runnable { final String workerName; final AtomicBoolean isShutdown; final TaskExecutorMetrics metrics; final TaskProcessor
processor; final AcceptorExecutor
taskDispatcher; WorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor
processor, AcceptorExecutor
taskDispatcher) { this.workerName = workerName; this.isShutdown = isShutdown; this.metrics = metrics; this.processor = processor; this.taskDispatcher = taskDispatcher; } String getWorkerName() { return workerName; } }
定义了基本的runnable类

SingleTaskWorkerRunnable

private final BlockingQueue
> singleItemWorkQueue = new LinkedBlockingQueue<>(); static class SingleTaskWorkerRunnable
extends WorkerRunnable
{ SingleTaskWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor
processor, AcceptorExecutor
acceptorExecutor) { super(workerName, isShutdown, metrics, processor, acceptorExecutor); } @Override public void run() { try { while (!isShutdown.get()) { BlockingQueue
> workQueue = taskDispatcher.requestWorkItem(); TaskHolder
taskHolder; while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) { if (isShutdown.get()) { return; } } metrics.registerExpiryTime(taskHolder); if (taskHolder != null) { ProcessingResult result = processor.process(taskHolder.getTask()); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(taskHolder, result); break; case PermanentError: logger.warn("Discarding a task of {} due to permanent error", workerName); } metrics.registerTaskResult(result, 1); } } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); } } }
这里是直接从singleItemWorkQueue去poll任务,poll出来是TaskHolder<ID, T>>

BatchWorkerRunnable

private final BlockingQueue
>> batchWorkQueue = new LinkedBlockingQueue<>(); static class BatchWorkerRunnable
extends WorkerRunnable
{ BatchWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor
processor, AcceptorExecutor
acceptorExecutor) { super(workerName, isShutdown, metrics, processor, acceptorExecutor); } @Override public void run() { try { while (!isShutdown.get()) { List
> holders = getWork(); metrics.registerExpiryTimes(holders); List
tasks = getTasksOf(holders); ProcessingResult result = processor.process(tasks); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(holders, result); break; case PermanentError: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); } metrics.registerTaskResult(result, tasks.size()); } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); } } private List
> getWork() throws InterruptedException { BlockingQueue
>> workQueue = taskDispatcher.requestWorkItems(); List
> result; do { result = workQueue.poll(1, TimeUnit.SECONDS); } while (!isShutdown.get() && result == null); return (result == null) ? new ArrayList<>() : result; } private List
getTasksOf(List
> holders) { List
tasks = new ArrayList<>(holders.size()); for (TaskHolder
holder : holders) { tasks.add(holder.getTask()); } return tasks; } }
从batchWorkQueue取poll任务,不过与single不同的是,它poll出来是List<TaskHolder<ID, T>>

二者对ProcessingResult的处理逻辑都一样,如下:

switch (result) {                        case Success:                            break;                        case Congestion:                        case TransientError:                            taskDispatcher.reprocess(holders, result);                            break;                        case PermanentError:                            logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);                    }
对于Congestion以及TransientError会重新放入队列重试,对于PermanentError则会log warn一下。

小结

eureka自己设计了TaskDispatcher,分为nonBatchingDispatcher以及batchingDispatcher。

调度任务为继承ReplicationTask的InstanceReplicationTask,其定义了基本属性,但是定义了public abstract String getTaskName()以及public abstract EurekaHttpResponse<?> execute() throws Throwable两个抽象方法,它们在PeerEurekaNode里头有匿名的实现类,实现register、cancel等相应的请求逻辑。

调度逻辑主要是支持根据id及优先级来调度,后来的同id的任务会覆盖正在运行的同id的任务,如果处理失败,则会放入重试队列,之后以最高优先级放入pendingTasks。

doc

转载地址:http://ucbnl.baihongyu.com/

你可能感兴趣的文章
一个用于监控Dell PowerEdge服务器硬件状态的nagios/icinga插件
查看>>
我的友情链接
查看>>
Xshell连接Ubuntu
查看>>
进阶之初探nodeJS
查看>>
Mac OS X添加网络打印机
查看>>
五个典型的JavaScript面试题
查看>>
如何搭建搭建各个版本的openstack的本地yum源?
查看>>
CentOS6.8下搭建Ipsec+L2TP ***服务
查看>>
linux基本命令
查看>>
mysql主从延迟
查看>>
不在让你为你写代码头疼的链接页代码
查看>>
Vmware clone后,linux无法上网
查看>>
NetSuite crm国内合作商来讲讲NetSuite常见问题
查看>>
我的友情链接
查看>>
【原创】MySQL 实现Oracle或者PostgreSQL的row_number over 这样的排名语法
查看>>
Golang面试题解析(五)
查看>>
shell 输出100个+方法总结
查看>>
我的友情链接
查看>>
在启动kubernets的时候报错
查看>>
hive通过其它用户连接后执行语句提示没有hdfs系统目录权限
查看>>