ConsumerRecordsrecords = consumer.poll(100);
/** * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have * subscribed to any topics or partitions before polling for data. ** On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last * consumed offset can be manually set through {
@link #seek(TopicPartition, long)} or automatically set as the last committed * offset for the subscribed list of partitions * * * @param timeout The time, in milliseconds, spent waiting in poll if data is not available in the buffer. * If 0, returns immediately with any records that are available currently in the buffer, else returns empty. * Must not be negative. * @return map of topic to records since the last fetch for the subscribed list of topics and partitions * * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of * partitions is undefined or out of range and no offset reset policy has been configured * @throws org.apache.kafka.common.errors.WakeupException if { @link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed * topics or to the configured groupId * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or * session timeout, errors deserializing key/value pairs, or any new error cases in future versions) * @throws java.lang.IllegalArgumentException if the timeout value is negative * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any * partitions to consume from */ @Override public ConsumerRecordspoll(long timeout) { try { if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); // poll for new data until the timeout expires long start = time.milliseconds(); long remaining = timeout; do { Map >> records = pollOnce(remaining); //pollOnce if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0) { //为了省时间,预先放fetch一次 client.pollNoWakeup(); } if (this.interceptors == null) return new ConsumerRecords<>(records); else return this.interceptors.onConsume(new ConsumerRecords<>(records)); //如果有interceptors,先处理一下 } long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; //在超时内,反复尝试poll } while (remaining > 0); return ConsumerRecords.empty(); //如果数据不ready,返回empty } finally { release(); } }
pollOnce
/** * Do one round of polling. In addition to checking for new data, this does any needed offset commits * (if auto-commit is enabled), and offset resets (if an offset reset policy is defined). * @param timeout The maximum time to block in the underlying call to { @link ConsumerNetworkClient#poll(long)}. * @return The fetched records (may be empty) */ private Map>> pollOnce(long timeout) { coordinator.poll(time.milliseconds()); //和ConsuemrCoordinator之间的心跳 // fetch positions if we have partitions we're subscribed to that we // don't know the offset for if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions()); //同步offset // if data is available already, return it immediately Map >> records = fetcher.fetchedRecords(); //已经有fetched if (!records.isEmpty()) return records; //直接返回 // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); //没有现成的数据,发送fetch命令 long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); client.poll(pollTimeout, now, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.needRejoin()) return Collections.emptyMap(); return fetcher.fetchedRecords(); }
看下fetcher
public Fetcher(ConsumerNetworkClient client, int minBytes, int maxBytes, int maxWaitMs, int fetchSize, int maxPollRecords, boolean checkCrcs, DeserializerkeyDeserializer, Deserializer valueDeserializer, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs) {
创建时,
this.fetcher = new Fetcher<>(this.client, config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, metrics, metricGrpPrefix, this.time, this.retryBackoffMs);
fetcher.fetchedRecords
/** * Return the fetched records, empty the record buffer and update the consumed position. * * NOTE: returning empty records guarantees the consumed position are NOT updated. * * @return The fetched records per partition * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE */ public Map>> fetchedRecords() { Map >> drained = new HashMap<>(); int recordsRemaining = maxPollRecords; //最大poll records数 while (recordsRemaining > 0) { if (nextInLineRecords == null || nextInLineRecords.isDrained()) { //如果nextInLineRecords是空的,没有records的 CompletedFetch completedFetch = completedFetches.poll(); //从completedFetches,fetched队列中取一个fetch if (completedFetch == null) break; nextInLineRecords = parseFetchedData(completedFetch); //parse Fetch到nextInLineRecords中 } else { TopicPartition partition = nextInLineRecords.partition; List > records = drainRecords(nextInLineRecords, recordsRemaining); //从nextInLineRecords取recordsRemaining个records if (!records.isEmpty()) { List > currentRecords = drained.get(partition); //取出partition对应的record list if (currentRecords == null) { drained.put(partition, records); //放入record list } else { // this case shouldn't usually happen because we only send one fetch at a time per partition, // but it might conceivably happen in some rare cases (such as partition leader changes). // we have to copy to a new list because the old one may be immutable List > newRecords = new ArrayList<>(records.size() + currentRecords.size()); newRecords.addAll(currentRecords); newRecords.addAll(records); drained.put(partition, newRecords); } recordsRemaining -= records.size(); } } } return drained; //返回 }
可以看到fetchedRecords只是从已经完成的fetch中读取数据
fetcher.sendFetches
先看
createFetchRequests
/** * Create fetch requests for all nodes for which we have assigned partitions * that have no existing requests in flight. */ private MapcreateFetchRequests() { // create the fetch info Cluster cluster = metadata.fetch(); Map > fetchable = new LinkedHashMap<>(); for (TopicPartition partition : fetchablePartitions()) { Node node = cluster.leaderFor(partition); //找到partition的leader所在node if (node == null) { metadata.requestUpdate(); } else if (this.client.pendingRequestCount(node) == 0) { //如果没有正在进行的fetch,一个partition同时只能有一个fetch请求 // if there is a leader and no in-flight requests, issue a new fetch LinkedHashMap fetch = fetchable.get(node); if (fetch == null) { fetch = new LinkedHashMap<>(); fetchable.put(node, fetch); } long position = this.subscriptions.position(partition); fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); //创建FetchRequest,position,从哪儿开始读,fetchSize,读多少 log.trace("Added fetch request for partition {} at offset {}", partition, position); } else { log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node); } } // create the fetches Map requests = new HashMap<>(); for (Map.Entry > entry : fetchable.entrySet()) { Node node = entry.getKey(); FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue()); //封装成FetchRequest requests.put(node, fetch); } return requests; }
/** * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have * an in-flight fetch or pending fetch data. * @return number of fetches sent */ public int sendFetches() { MapfetchRequestMap = createFetchRequests(); //创建Fetch Request for (Map.Entry fetchEntry : fetchRequestMap.entrySet()) { final FetchRequest request = fetchEntry.getValue(); final Node fetchTarget = fetchEntry.getKey(); client.send(fetchTarget, ApiKeys.FETCH, request) //send request .addListener(new RequestFutureListener () { @Override public void onSuccess(ClientResponse resp) { //如果成功 FetchResponse response = (FetchResponse) resp.responseBody(); Set partitions = new HashSet<>(response.responseData().keySet()); for (Map.Entry entry : response.responseData().entrySet()) { TopicPartition partition = entry.getKey(); long fetchOffset = request.fetchData().get(partition).offset; FetchResponse.PartitionData fetchData = entry.getValue(); completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator)); //把fetchData封装成CompletedFetch,加入completedFetcheslist } sensors.fetchLatency.record(resp.requestLatencyMs()); sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); } @Override public void onFailure(RuntimeException e) { log.debug("Fetch request to {} failed", fetchTarget, e); } }); } return fetchRequestMap.size(); }
client.send
ConsumerNetworkClient
/** * Send a new request. Note that the request is not actually transmitted on the * network until one of the { @link #poll(long)} variants is invoked. At this * point the request will either be transmitted successfully or will fail. * Use the returned future to obtain the result of the send. Note that there is no * need to check for disconnects explicitly on the { @link ClientResponse} object; * instead, the future will be failed with a { @link DisconnectException}. * @param node The destination of the request * @param api The Kafka API call * @param request The request payload * @return A future which indicates the result of the send. */ public RequestFuturesend(Node node, ApiKeys api, AbstractRequest request) { return send(node, api, ProtoUtils.latestVersion(api.id), request); } private RequestFuture send(Node node, ApiKeys api, short version, AbstractRequest request) { long now = time.milliseconds(); RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); RequestHeader header = client.nextRequestHeader(api, version); ClientRequest clientRequest = new ClientRequest(node.idString(), now, true, header, request, completionHandler); //封装成client request put(node, clientRequest); //没有真正发出,而是放入list // wakeup the client in case it is blocking in poll so that we can send the queued request client.wakeup(); return completionHandler.future; } private void put(Node node, ClientRequest request) { synchronized (this) { List nodeUnsent = unsent.get(node); if (nodeUnsent == null) { nodeUnsent = new ArrayList<>(); unsent.put(node, nodeUnsent); } nodeUnsent.add(request); } }
NetworkClient.wakeup
/** * Interrupt the client if it is blocked waiting on I/O. */ @Override public void wakeup() { this.selector.wakeup(); }
wakeup就是让client从selector的block等待中,被唤醒,可以处理其他的请求
这里说了,只有当poll被调用的时候,才会真正的将request发送出去,poll是在哪儿被调用的?
在上面pollOnce的时候,有这样的逻辑
client.poll(pollTimeout, now, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } });
意思是调用poll的超时是pollTimeout,
PollCondition.shouldBlock,意思是何时我们需要block等待,当hasCompletedFetches时,是不需要等数据的,所以只有当没有现成的数据的时候,才需要等
ConsumerNetworkClient.poll
/** * Poll for any network IO. * @param timeout timeout in milliseconds * @param now current time in milliseconds */ public void poll(long timeout, long now, PollCondition pollCondition) { // there may be handlers which need to be invoked if we woke up the previous call to poll firePendingCompletedRequests(); synchronized (this) { // send all the requests we can send now trySend(now); // check whether the poll is still needed by the caller. Note that if the expected completion // condition becomes satisfied after the call to shouldBlock() (because of a fired completion // handler), the client will be woken up. if (pollCondition == null || pollCondition.shouldBlock()) { // if there are no requests in flight, do not block longer than the retry backoff if (client.inFlightRequestCount() == 0) timeout = Math.min(timeout, retryBackoffMs); client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now); now = time.milliseconds(); } else { client.poll(0, now); } // handle any disconnects by failing the active requests. note that disconnects must // be checked immediately following poll since any subsequent call to client.ready() // will reset the disconnect status checkDisconnects(now); // trigger wakeups after checking for disconnects so that the callbacks will be ready // to be fired on the next call to poll() maybeTriggerWakeup(); // throw InterruptException if this thread is interrupted maybeThrowInterruptException(); // try again to send requests since buffer space may have been // cleared or a connect finished in the poll trySend(now); // fail requests that couldn't be sent if they have expired failExpiredRequests(now); } // called without the lock to avoid deadlock potential if handlers need to acquire locks firePendingCompletedRequests(); }
trySend
private boolean trySend(long now) { // send any requests that can be sent now boolean requestsSent = false; for (Map.Entry> requestEntry: unsent.entrySet()) { // 前面send的时候时候,request放入unsent Node node = requestEntry.getKey(); Iterator iterator = requestEntry.getValue().iterator(); while (iterator.hasNext()) { ClientRequest request = iterator.next(); if (client.ready(node, now)) { // Begin connecting to the given node, return true if we are already connected and ready to send to that node client.send(request, now); // 调用send,发送request iterator.remove(); requestsSent = true; } } } return requestsSent; }
NetworkClient.send
/** * Queue up the given request for sending. Requests can only be sent out to ready nodes. * @param request The request * @param now The current timestamp */ @Override public void send(ClientRequest request, long now) { doSend(request, false, now); }
private void doSend(ClientRequest request, boolean isInternalRequest, long now) { String nodeId = request.destination(); if (request.header().apiKey() == ApiKeys.API_VERSIONS.id) { if (!canSendApiVersionsRequest(nodeId)) throw new IllegalStateException("Attempt to send API Versions request to node " + nodeId + " which is not ready."); } else if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); Send send = request.body().toSend(nodeId, request.header()); InFlightRequest inFlightRequest = new InFlightRequest( request.header(), request.createdTimeMs(), request.destination(), request.callback(), request.expectResponse(), isInternalRequest, send, now); this.inFlightRequests.add(inFlightRequest); // 加入inFlightRequest selector.send(inFlightRequest.send); }
最终用selector.send来发送Send
/** * Queue the given request for sending in the subsequent { @link #poll(long)} calls * @param send The request to send */ public void send(Send send) { String connectionId = send.destination(); if (closingChannels.containsKey(connectionId)) this.failedSends.add(connectionId); else { KafkaChannel channel = channelOrFail(connectionId, false); // 从Mapchannels中get该connect对应的channel
try { channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(connectionId); close(channel, false); } } }
KafkaChannel.setSend
public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
可以看到select.send也只是把send放到channel中,
真正发送要等到调用NetworkClient.poll
在ConsumerNetworkClient.poll中,
if (pollCondition == null || pollCondition.shouldBlock()) { // if there are no requests in flight, do not block longer than the retry backoff if (client.inFlightRequestCount() == 0) timeout = Math.min(timeout, retryBackoffMs); client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now); now = time.milliseconds(); } else { client.poll(0, now); }
如果需要block或没有pollCondition,选择block timeout来等待数据
否则调用client.poll(0, now),意思是没有数据即刻返回
NetworkClient.poll
@Override public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); }
select
private int select(long ms) throws IOException { if (ms < 0L) throw new IllegalArgumentException("timeout should be >= 0"); if (ms == 0L) return this.nioSelector.selectNow(); else return this.nioSelector.select(ms); }
pollSelectionKeys
private void pollSelectionKeys(IterableselectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); KafkaChannel channel = channel(key); try { /* complete any connections that have finished their handshake (either normally or immediately) */ if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { Send send = channel.write(); //真正写出数据 if (send != null) { this.completedSends.add(send); } } /* cancel any defunct sockets */ if (!key.isValid()) close(channel, true); } catch (Exception e) { } } }
直接用NIO写应用,是需要勇气的