博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka – kafka consumer
阅读量:6609 次
发布时间:2019-06-24

本文共 26220 字,大约阅读时间需要 87 分钟。

ConsumerRecords
records = consumer.poll(100);

 

/**     * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have     * subscribed to any topics or partitions before polling for data.     * 

* On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last * consumed offset can be manually set through {

@link #seek(TopicPartition, long)} or automatically set as the last committed * offset for the subscribed list of partitions * * * @param timeout The time, in milliseconds, spent waiting in poll if data is not available in the buffer. * If 0, returns immediately with any records that are available currently in the buffer, else returns empty. * Must not be negative. * @return map of topic to records since the last fetch for the subscribed list of topics and partitions * * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of * partitions is undefined or out of range and no offset reset policy has been configured * @throws org.apache.kafka.common.errors.WakeupException if {
@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed * topics or to the configured groupId * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or * session timeout, errors deserializing key/value pairs, or any new error cases in future versions) * @throws java.lang.IllegalArgumentException if the timeout value is negative * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any * partitions to consume from */ @Override public ConsumerRecords
poll(long timeout) { try { if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); // poll for new data until the timeout expires long start = time.milliseconds(); long remaining = timeout; do { Map
>> records = pollOnce(remaining); //pollOnce if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0) { //为了省时间,预先放fetch一次 client.pollNoWakeup(); } if (this.interceptors == null) return new ConsumerRecords<>(records); else return this.interceptors.onConsume(new ConsumerRecords<>(records)); //如果有interceptors,先处理一下 } long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; //在超时内,反复尝试poll } while (remaining > 0); return ConsumerRecords.empty(); //如果数据不ready,返回empty } finally { release(); } }

 

pollOnce

/**     * Do one round of polling. In addition to checking for new data, this does any needed offset commits     * (if auto-commit is enabled), and offset resets (if an offset reset policy is defined).     * @param timeout The maximum time to block in the underlying call to {
@link ConsumerNetworkClient#poll(long)}. * @return The fetched records (may be empty) */ private Map
>> pollOnce(long timeout) { coordinator.poll(time.milliseconds()); //和ConsuemrCoordinator之间的心跳 // fetch positions if we have partitions we're subscribed to that we // don't know the offset for if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions()); //同步offset // if data is available already, return it immediately Map
>> records = fetcher.fetchedRecords(); //已经有fetched if (!records.isEmpty()) return records; //直接返回 // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); //没有现成的数据,发送fetch命令 long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); client.poll(pollTimeout, now, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.needRejoin()) return Collections.emptyMap(); return fetcher.fetchedRecords(); }

 

看下fetcher

public Fetcher(ConsumerNetworkClient client,                   int minBytes,                   int maxBytes,                   int maxWaitMs,                   int fetchSize,                   int maxPollRecords,                   boolean checkCrcs,                   Deserializer
keyDeserializer, Deserializer
valueDeserializer, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs) {

创建时,

this.fetcher = new Fetcher<>(this.client,                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),                    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),                    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),                    config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),                    config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),                    this.keyDeserializer,                    this.valueDeserializer,                    this.metadata,                    this.subscriptions,                    metrics,                    metricGrpPrefix,                    this.time,                    this.retryBackoffMs);
可以看出对应的配置

 

fetcher.fetchedRecords

/**     * Return the fetched records, empty the record buffer and update the consumed position.     *     * NOTE: returning empty records guarantees the consumed position are NOT updated.     *     * @return The fetched records per partition     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and     *         the defaultResetPolicy is NONE     */    public Map
>> fetchedRecords() { Map
>> drained = new HashMap<>(); int recordsRemaining = maxPollRecords; //最大poll records数 while (recordsRemaining > 0) { if (nextInLineRecords == null || nextInLineRecords.isDrained()) { //如果nextInLineRecords是空的,没有records的 CompletedFetch completedFetch = completedFetches.poll(); //从completedFetches,fetched队列中取一个fetch if (completedFetch == null) break; nextInLineRecords = parseFetchedData(completedFetch); //parse Fetch到nextInLineRecords中 } else { TopicPartition partition = nextInLineRecords.partition; List
> records = drainRecords(nextInLineRecords, recordsRemaining); //从nextInLineRecords取recordsRemaining个records if (!records.isEmpty()) { List
> currentRecords = drained.get(partition); //取出partition对应的record list if (currentRecords == null) { drained.put(partition, records); //放入record list } else { // this case shouldn't usually happen because we only send one fetch at a time per partition, // but it might conceivably happen in some rare cases (such as partition leader changes). // we have to copy to a new list because the old one may be immutable List
> newRecords = new ArrayList<>(records.size() + currentRecords.size()); newRecords.addAll(currentRecords); newRecords.addAll(records); drained.put(partition, newRecords); } recordsRemaining -= records.size(); } } } return drained; //返回 }

可以看到fetchedRecords只是从已经完成的fetch中读取数据

 

fetcher.sendFetches

先看

createFetchRequests
/**     * Create fetch requests for all nodes for which we have assigned partitions     * that have no existing requests in flight.     */    private Map
createFetchRequests() { // create the fetch info Cluster cluster = metadata.fetch(); Map
> fetchable = new LinkedHashMap<>(); for (TopicPartition partition : fetchablePartitions()) { Node node = cluster.leaderFor(partition); //找到partition的leader所在node if (node == null) { metadata.requestUpdate(); } else if (this.client.pendingRequestCount(node) == 0) { //如果没有正在进行的fetch,一个partition同时只能有一个fetch请求 // if there is a leader and no in-flight requests, issue a new fetch LinkedHashMap
fetch = fetchable.get(node); if (fetch == null) { fetch = new LinkedHashMap<>(); fetchable.put(node, fetch); } long position = this.subscriptions.position(partition); fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); //创建FetchRequest,position,从哪儿开始读,fetchSize,读多少 log.trace("Added fetch request for partition {} at offset {}", partition, position); } else { log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node); } } // create the fetches Map
requests = new HashMap<>(); for (Map.Entry
> entry : fetchable.entrySet()) { Node node = entry.getKey(); FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue()); //封装成FetchRequest requests.put(node, fetch); } return requests; }

 

/**     * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have     * an in-flight fetch or pending fetch data.     * @return number of fetches sent     */    public int sendFetches() {        Map
fetchRequestMap = createFetchRequests(); //创建Fetch Request for (Map.Entry
fetchEntry : fetchRequestMap.entrySet()) { final FetchRequest request = fetchEntry.getValue(); final Node fetchTarget = fetchEntry.getKey(); client.send(fetchTarget, ApiKeys.FETCH, request) //send request .addListener(new RequestFutureListener
() { @Override public void onSuccess(ClientResponse resp) { //如果成功 FetchResponse response = (FetchResponse) resp.responseBody(); Set
partitions = new HashSet<>(response.responseData().keySet()); for (Map.Entry
entry : response.responseData().entrySet()) { TopicPartition partition = entry.getKey(); long fetchOffset = request.fetchData().get(partition).offset; FetchResponse.PartitionData fetchData = entry.getValue(); completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator)); //把fetchData封装成CompletedFetch,加入completedFetcheslist } sensors.fetchLatency.record(resp.requestLatencyMs()); sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); } @Override public void onFailure(RuntimeException e) { log.debug("Fetch request to {} failed", fetchTarget, e); } }); } return fetchRequestMap.size(); }

 

client.send

ConsumerNetworkClient
/**     * Send a new request. Note that the request is not actually transmitted on the     * network until one of the {
@link #poll(long)} variants is invoked. At this * point the request will either be transmitted successfully or will fail. * Use the returned future to obtain the result of the send. Note that there is no * need to check for disconnects explicitly on the {
@link ClientResponse} object; * instead, the future will be failed with a {
@link DisconnectException}. * @param node The destination of the request * @param api The Kafka API call * @param request The request payload * @return A future which indicates the result of the send. */ public RequestFuture
send(Node node, ApiKeys api, AbstractRequest request) { return send(node, api, ProtoUtils.latestVersion(api.id), request); } private RequestFuture
send(Node node, ApiKeys api, short version, AbstractRequest request) { long now = time.milliseconds(); RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); RequestHeader header = client.nextRequestHeader(api, version); ClientRequest clientRequest = new ClientRequest(node.idString(), now, true, header, request, completionHandler); //封装成client request put(node, clientRequest); //没有真正发出,而是放入list // wakeup the client in case it is blocking in poll so that we can send the queued request client.wakeup(); return completionHandler.future; } private void put(Node node, ClientRequest request) { synchronized (this) { List
nodeUnsent = unsent.get(node); if (nodeUnsent == null) { nodeUnsent = new ArrayList<>(); unsent.put(node, nodeUnsent); } nodeUnsent.add(request); } }

 

NetworkClient.wakeup

/**     * Interrupt the client if it is blocked waiting on I/O.     */    @Override    public void wakeup() {        this.selector.wakeup();    }

wakeup就是让client从selector的block等待中,被唤醒,可以处理其他的请求

 

这里说了,只有当poll被调用的时候,才会真正的将request发送出去,poll是在哪儿被调用的?

 

在上面pollOnce的时候,有这样的逻辑

client.poll(pollTimeout, now, new PollCondition() {            @Override            public boolean shouldBlock() {                // since a fetch might be completed by the background thread, we need this poll condition                // to ensure that we do not block unnecessarily in poll()                return !fetcher.hasCompletedFetches();            }        });

意思是调用poll的超时是pollTimeout,

PollCondition.shouldBlock,意思是何时我们需要block等待,当hasCompletedFetches时,是不需要等数据的,所以只有当没有现成的数据的时候,才需要等

 

ConsumerNetworkClient.poll

/**     * Poll for any network IO.     * @param timeout timeout in milliseconds     * @param now current time in milliseconds     */    public void poll(long timeout, long now, PollCondition pollCondition) {        // there may be handlers which need to be invoked if we woke up the previous call to poll        firePendingCompletedRequests();        synchronized (this) {            // send all the requests we can send now            trySend(now);            // check whether the poll is still needed by the caller. Note that if the expected completion            // condition becomes satisfied after the call to shouldBlock() (because of a fired completion            // handler), the client will be woken up.            if (pollCondition == null || pollCondition.shouldBlock()) {                // if there are no requests in flight, do not block longer than the retry backoff                if (client.inFlightRequestCount() == 0)                    timeout = Math.min(timeout, retryBackoffMs);                client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);                now = time.milliseconds();            } else {                client.poll(0, now);            }            // handle any disconnects by failing the active requests. note that disconnects must            // be checked immediately following poll since any subsequent call to client.ready()            // will reset the disconnect status            checkDisconnects(now);            // trigger wakeups after checking for disconnects so that the callbacks will be ready            // to be fired on the next call to poll()            maybeTriggerWakeup();                        // throw InterruptException if this thread is interrupted            maybeThrowInterruptException();            // try again to send requests since buffer space may have been            // cleared or a connect finished in the poll            trySend(now);            // fail requests that couldn't be sent if they have expired            failExpiredRequests(now);        }        // called without the lock to avoid deadlock potential if handlers need to acquire locks        firePendingCompletedRequests();    }

 

trySend

private boolean trySend(long now) {        // send any requests that can be sent now        boolean requestsSent = false;        for (Map.Entry
> requestEntry: unsent.entrySet()) { // 前面send的时候时候,request放入unsent Node node = requestEntry.getKey(); Iterator
iterator = requestEntry.getValue().iterator(); while (iterator.hasNext()) { ClientRequest request = iterator.next(); if (client.ready(node, now)) {
// Begin connecting to the given node, return true if we are already connected and ready to send to that node client.send(request, now); // 调用send,发送request iterator.remove(); requestsSent = true; } } } return requestsSent; }

 

NetworkClient.send

/**     * Queue up the given request for sending. Requests can only be sent out to ready nodes.     * @param request The request     * @param now The current timestamp     */    @Override    public void send(ClientRequest request, long now) {        doSend(request, false, now);    }

 

private void doSend(ClientRequest request, boolean isInternalRequest, long now) {        String nodeId = request.destination();        if (request.header().apiKey() == ApiKeys.API_VERSIONS.id) {            if (!canSendApiVersionsRequest(nodeId))                throw new IllegalStateException("Attempt to send API Versions request to node " + nodeId + " which is not ready.");        } else if (!canSendRequest(nodeId))            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");        Send send = request.body().toSend(nodeId, request.header());        InFlightRequest inFlightRequest = new InFlightRequest(                request.header(),                request.createdTimeMs(),                request.destination(),                request.callback(),                request.expectResponse(),                isInternalRequest,                send,                now);        this.inFlightRequests.add(inFlightRequest); // 加入inFlightRequest        selector.send(inFlightRequest.send);    }

最终用selector.send来发送Send

/**     * Queue the given request for sending in the subsequent {
@link #poll(long)} calls * @param send The request to send */ public void send(Send send) { String connectionId = send.destination(); if (closingChannels.containsKey(connectionId)) this.failedSends.add(connectionId); else { KafkaChannel channel = channelOrFail(connectionId, false); // 从Map
channels中get该connect对应的channel
try {                channel.setSend(send);            } catch (CancelledKeyException e) {                this.failedSends.add(connectionId);                close(channel, false);            }        }    }

KafkaChannel.setSend

public void setSend(Send send) {        if (this.send != null)            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");        this.send = send;        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);    }

可以看到select.send也只是把send放到channel中,

真正发送要等到调用NetworkClient.poll

在ConsumerNetworkClient.poll中,

if (pollCondition == null || pollCondition.shouldBlock()) {                // if there are no requests in flight, do not block longer than the retry backoff                if (client.inFlightRequestCount() == 0)                    timeout = Math.min(timeout, retryBackoffMs);                client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);                now = time.milliseconds();            } else {                client.poll(0, now);            }

如果需要block或没有pollCondition,选择block timeout来等待数据

否则调用client.poll(0, now),意思是没有数据即刻返回

NetworkClient.poll

@Override    public void poll(long timeout) throws IOException {        if (timeout < 0)            throw new IllegalArgumentException("timeout should be >= 0");        clear();        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())            timeout = 0;        /* check ready keys */        long startSelect = time.nanoseconds();        int readyKeys = select(timeout);        long endSelect = time.nanoseconds();        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);        }        addToCompletedReceives();        long endIo = time.nanoseconds();        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());        // we use the time at the end of select to ensure that we don't close any connections that        // have just been processed in pollSelectionKeys        maybeCloseOldestConnection(endSelect);    }

 

select

private int select(long ms) throws IOException {        if (ms < 0L)            throw new IllegalArgumentException("timeout should be >= 0");        if (ms == 0L)            return this.nioSelector.selectNow();        else            return this.nioSelector.select(ms);    }

 

pollSelectionKeys

private void pollSelectionKeys(Iterable
selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator
iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); KafkaChannel channel = channel(key); try { /* complete any connections that have finished their handshake (either normally or immediately) */ if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { Send send = channel.write(); //真正写出数据 if (send != null) { this.completedSends.add(send); } } /* cancel any defunct sockets */ if (!key.isValid()) close(channel, true); } catch (Exception e) { } } }

 

直接用NIO写应用,是需要勇气的

转载地址:http://lciso.baihongyu.com/

你可能感兴趣的文章
redis命令 - GET
查看>>
httpd.conf的基本设置
查看>>
RHEL/Centos7新功能
查看>>
DBA日常工作职责
查看>>
Redis的持久化
查看>>
linux安装NFS服务器学习
查看>>
Planner .NET日历日程控件能给你的应用程序提供多种日历日程功能
查看>>
我的友情链接
查看>>
Linux压力测试
查看>>
JAVA中的线程机制(二)
查看>>
nginx安装与配置2(转载)
查看>>
Linux下Mongodb安装和启动配置
查看>>
沈阳一饭店凌晨爆燃,燃气报警器时刻预防
查看>>
Redis 与 数据库处理数据的两种模式
查看>>
VUE2中axios的使用方法
查看>>
CS 229 notes Supervised Learning
查看>>
2018.10.27-dtoj-3996-Lesson5!(johnny)
查看>>
DataTable转换成json字符串
查看>>
ubuntu 12.04 安装 redis
查看>>
【DM642】ICELL Interface—Cells as Algorithm Containers
查看>>