序
本文主要研究一下eureka的TaskDispatcher
PeerEurekaNode
public class PeerEurekaNode { public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) { this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS); } /* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config, int batchSize, long maxBatchingDelayMs, long retrySleepTimeMs, long serverUnavailableSleepTimeMs) { this.registry = registry; this.targetHost = targetHost; this.replicationClient = replicationClient; this.serviceUrl = serviceUrl; this.config = config; this.maxProcessingDelayMs = config.getMaxTimeForReplication(); String batcherName = getBatcherName(); ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient); this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher( batcherName, config.getMaxElementsInPeerReplicationPool(), batchSize, config.getMaxThreadsForPeerReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor ); this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher( targetHost, config.getMaxElementsInStatusReplicationPool(), config.getMaxThreadsForStatusReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor ); } //......}
- statusUpdate
/** * Send the status information of of the ASG represented by the instance. * ** ASG (Autoscaling group) names are available for instances in AWS and the * ASG information is used for determining if the instance should be * registered as {@link InstanceStatus#DOWN} or {@link InstanceStatus#UP}. * * @param asgName * the asg name if any of this instance. * @param newStatus * the new status of the ASG. */ public void statusUpdate(final String asgName, final ASGStatus newStatus) { long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs; nonBatchingDispatcher.process( asgName, new AsgReplicationTask(targetHost, Action.StatusUpdate, asgName, newStatus) { public EurekaHttpResponse
execute() { return replicationClient.statusUpdate(asgName, newStatus); } }, expiryTime ); }
提交任务到nonBatchingDispatcher
- cancel
public void cancel(final String appName, final String id) throws Exception { long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs; batchingDispatcher.process( taskId("cancel", appName, id), new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) { @Override public EurekaHttpResponseexecute() { return replicationClient.cancel(appName, id); } @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { logger.warn("{}: missing entry.", getTaskName()); } } }, expiryTime ); }
像cancel等方法是提交到batchingDispatcher
ReplicationTask
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/ReplicationTask.java
/** * Base class for all replication tasks. */abstract class ReplicationTask { private static final Logger logger = LoggerFactory.getLogger(ReplicationTask.class); protected final String peerNodeName; protected final Action action; ReplicationTask(String peerNodeName, Action action) { this.peerNodeName = peerNodeName; this.action = action; } public abstract String getTaskName(); public Action getAction() { return action; } public abstract EurekaHttpResponse execute() throws Throwable; public void handleSuccess() { } public void handleFailure(int statusCode, Object responseEntity) throws Throwable { logger.warn("The replication of task {} failed with response code {}", getTaskName(), statusCode); }}
它是所有replication任务的基类
InstanceReplicationTask
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/InstanceReplicationTask.java
/** * Base {@link ReplicationTask} class for instance related replication requests. * * @author Tomasz Bak */public abstract class InstanceReplicationTask extends ReplicationTask { /** * For cancel request there may be no InstanceInfo object available so we need to store app/id pair * explicitly. */ private final String appName; private final String id; private final InstanceInfo instanceInfo; private final InstanceStatus overriddenStatus; private final boolean replicateInstanceInfo; //......}
跟instance相关的replication任务,peerEurekaNode里头的register、heartbeat、statusUpdate、deleteStatusOverride、cancel用的都是InstanceReplicationTask。其中statusUpdate是提交到nonBatchingDispatcher,其他的都提交到batchingDispatcher
TaskDispatcher
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/util/batcher/TaskDispatcher.java
/** * Task dispatcher takes task from clients, and delegates their execution to a configurable number of workers. * The task can be processed one at a time or in batches. Only non-expired tasks are executed, and if a newer * task with the same id is scheduled for execution, the old one is deleted. Lazy dispatch of work (only on demand) * to workers, guarantees that data are always up to date, and no stale task processing takes place. *Task processor
* A client of this component must provide an implementation of {@link TaskProcessor} interface, which will do * the actual work of task processing. This implementation must be thread safe, as it is called concurrently by * multiple threads. *Execution modes
* To create non batched executor call {@link TaskDispatchers#createNonBatchingTaskDispatcher(String, int, int, long, long, TaskProcessor)} * method. Batched executor is created by {@link TaskDispatchers#createBatchingTaskDispatcher(String, int, int, int, long, long, TaskProcessor)}. * * @author Tomasz Bak */public interface TaskDispatcher{ void process(ID id, T task, long expiryTime); void shutdown();}
这个TaskDispatcher主要是任务分发,其中最重要的一点是只有没有过期的任务才会执行,然后如果同一个id有更新的任务调度,则旧的那个将会被删除掉。TaskDispatcher分nonBatchingDispatcher以及batchingDispatcher两种。
TaskDispatchers
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/util/batcher/TaskDispatchers.java
public class TaskDispatchers { public staticTaskDispatcher createNonBatchingTaskDispatcher(String id, int maxBufferSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor taskProcessor) { final AcceptorExecutor acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, 1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher () { @Override public void process(ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } @Override public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; } public static TaskDispatcher createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor taskProcessor) { final AcceptorExecutor acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher () { @Override public void process(ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } @Override public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; }}
提供了两个工厂方法,分别用来创建nonBatchingDispatcher以及batchingDispatcher。前者的AcceptorExecutor的maxBatchingSize为1,TaskExecutors是singleItemExecutors方法创建;后者的AcceptorExecutor的maxBatchingSize由构造器传入设置,默认是250,TaskExecutors是batchExecutors方法创建。
AcceptorExecutor
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/util/batcher/AcceptorExecutor.java
private final BlockingQueue> acceptorQueue = new LinkedBlockingQueue<>(); private final BlockingDeque > reprocessQueue = new LinkedBlockingDeque<>(); void process(ID id, T task, long expiryTime) { acceptorQueue.add(new TaskHolder (id, task, expiryTime)); acceptedTasks++; } void reprocess(List > holders, ProcessingResult processingResult) { reprocessQueue.addAll(holders); replayedTasks += holders.size(); trafficShaper.registerFailure(processingResult); } void reprocess(TaskHolder taskHolder, ProcessingResult processingResult) { reprocessQueue.add(taskHolder); replayedTasks++; trafficShaper.registerFailure(processingResult); }
process放入acceptorQueue,reprocess放入reprocessQueue
AcceptorRunner
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors"); this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id); this.acceptorThread.setDaemon(true); this.acceptorThread.start(); class AcceptorRunner implements Runnable { @Override public void run() { long scheduleTime = 0; while (!isShutdown.get()) { try { drainInputQueues(); int totalItems = processingOrder.size(); long now = System.currentTimeMillis(); if (scheduleTime < now) { scheduleTime = now + trafficShaper.transmissionDelay(); } if (scheduleTime <= now) { assignBatchWork(); assignSingleItemWork(); } // If no worker is requesting data or there is a delay injected by the traffic shaper, // sleep for some time to avoid tight loop. if (totalItems == processingOrder.size()) { Thread.sleep(10); } } catch (InterruptedException ex) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery AcceptorThread error", e); } } } //...... }
这里会循环不断地drainInputQueues,然后assignBatchWork、assignSingleItemWork
drainInputQueues
private void drainInputQueues() throws InterruptedException { do { drainReprocessQueue(); drainAcceptorQueue(); if (!isShutdown.get()) { // If all queues are empty, block for a while on the acceptor queue if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) { TaskHoldertaskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS); if (taskHolder != null) { appendTaskHolder(taskHolder); } } } } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty()); }
这里调用了drainReprocessQueue、drainAcceptorQueue
- drainAcceptorQueue
private void drainAcceptorQueue() { while (!acceptorQueue.isEmpty()) { appendTaskHolder(acceptorQueue.poll()); } } private void appendTaskHolder(TaskHoldertaskHolder) { if (isFull()) { pendingTasks.remove(processingOrder.poll()); queueOverflows++; } TaskHolder previousTask = pendingTasks.put(taskHolder.getId(), taskHolder); if (previousTask == null) { processingOrder.add(taskHolder.getId()); } else { overriddenTasks++; } }
把acceptorQueue里头的任务拿出来,放到pendingTasks队列里头
- drainReprocessQueue
private void drainReprocessQueue() { long now = System.currentTimeMillis(); while (!reprocessQueue.isEmpty() && !isFull()) { TaskHoldertaskHolder = reprocessQueue.pollLast(); ID id = taskHolder.getId(); if (taskHolder.getExpiryTime() <= now) { expiredTasks++; } else if (pendingTasks.containsKey(id)) { overriddenTasks++; } else { pendingTasks.put(id, taskHolder); processingOrder.addFirst(id); } } if (isFull()) { queueOverflows += reprocessQueue.size(); reprocessQueue.clear(); } }
把reprocessQueue里头的任务拿出来,如果没有过期而且不是重复id,则放到pendingTasks,并且processingOrder.addFirst(id)
assign work
void assignSingleItemWork() { if (!processingOrder.isEmpty()) { if (singleItemWorkRequests.tryAcquire(1)) { long now = System.currentTimeMillis(); while (!processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolderholder = pendingTasks.remove(id); if (holder.getExpiryTime() > now) { singleItemWorkQueue.add(holder); return; } expiredTasks++; } singleItemWorkRequests.release(); } } } void assignBatchWork() { if (hasEnoughTasksForNextBatch()) { if (batchWorkRequests.tryAcquire(1)) { long now = System.currentTimeMillis(); int len = Math.min(maxBatchingSize, processingOrder.size()); List > holders = new ArrayList<>(len); while (holders.size() < len && !processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder holder = pendingTasks.remove(id); if (holder.getExpiryTime() > now) { holders.add(holder); } else { expiredTasks++; } } if (holders.isEmpty()) { batchWorkRequests.release(); } else { batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS); batchWorkQueue.add(holders); } } } }
这里头就是根据优先级把pendingTasks的任务放到singleItemWorkQueue或者batchWorkQueue
WorkerRunnable
abstract static class WorkerRunnableimplements Runnable { final String workerName; final AtomicBoolean isShutdown; final TaskExecutorMetrics metrics; final TaskProcessor processor; final AcceptorExecutor taskDispatcher; WorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor processor, AcceptorExecutor taskDispatcher) { this.workerName = workerName; this.isShutdown = isShutdown; this.metrics = metrics; this.processor = processor; this.taskDispatcher = taskDispatcher; } String getWorkerName() { return workerName; } }
定义了基本的runnable类
SingleTaskWorkerRunnable
private final BlockingQueue> singleItemWorkQueue = new LinkedBlockingQueue<>(); static class SingleTaskWorkerRunnable extends WorkerRunnable { SingleTaskWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor processor, AcceptorExecutor acceptorExecutor) { super(workerName, isShutdown, metrics, processor, acceptorExecutor); } @Override public void run() { try { while (!isShutdown.get()) { BlockingQueue > workQueue = taskDispatcher.requestWorkItem(); TaskHolder taskHolder; while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) { if (isShutdown.get()) { return; } } metrics.registerExpiryTime(taskHolder); if (taskHolder != null) { ProcessingResult result = processor.process(taskHolder.getTask()); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(taskHolder, result); break; case PermanentError: logger.warn("Discarding a task of {} due to permanent error", workerName); } metrics.registerTaskResult(result, 1); } } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); } } }
这里是直接从singleItemWorkQueue去poll任务,poll出来是TaskHolder<ID, T>>
BatchWorkerRunnable
private final BlockingQueue
>> batchWorkQueue = new LinkedBlockingQueue<>(); static class BatchWorkerRunnable extends WorkerRunnable { BatchWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor processor, AcceptorExecutor acceptorExecutor) { super(workerName, isShutdown, metrics, processor, acceptorExecutor); } @Override public void run() { try { while (!isShutdown.get()) { List > holders = getWork(); metrics.registerExpiryTimes(holders); List tasks = getTasksOf(holders); ProcessingResult result = processor.process(tasks); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(holders, result); break; case PermanentError: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); } metrics.registerTaskResult(result, tasks.size()); } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); } } private List > getWork() throws InterruptedException { BlockingQueue
>> workQueue = taskDispatcher.requestWorkItems(); List > result; do { result = workQueue.poll(1, TimeUnit.SECONDS); } while (!isShutdown.get() && result == null); return (result == null) ? new ArrayList<>() : result; } private List getTasksOf(List > holders) { List tasks = new ArrayList<>(holders.size()); for (TaskHolder holder : holders) { tasks.add(holder.getTask()); } return tasks; } }
从batchWorkQueue取poll任务,不过与single不同的是,它poll出来是List<TaskHolder<ID, T>>
二者对ProcessingResult的处理逻辑都一样,如下:
switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(holders, result); break; case PermanentError: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); }
对于Congestion以及TransientError会重新放入队列重试,对于PermanentError则会log warn一下。
小结
eureka自己设计了TaskDispatcher,分为nonBatchingDispatcher以及batchingDispatcher。
调度任务为继承ReplicationTask的InstanceReplicationTask,其定义了基本属性,但是定义了public abstract String getTaskName()
以及public abstract EurekaHttpResponse<?> execute() throws Throwable
两个抽象方法,它们在PeerEurekaNode里头有匿名的实现类,实现register、cancel等相应的请求逻辑。
调度逻辑主要是支持根据id及优先级来调度,后来的同id的任务会覆盖正在运行的同id的任务,如果处理失败,则会放入重试队列,之后以最高优先级放入pendingTasks。