Review Board 1.7.22

Initial patch KAFKA-1012

Review Request #13908 - Created Aug. 30, 2013 and updated

Tejas Patil
jkreps, nehanarkhede
See for details.
Manual testing with 3 brokers, 2 producers and 6 consumers. Existing junits pass
Review request changed
Updated (Aug. 30, 2013, 9:19 p.m.)
Posted (Sept. 4, 2013, 11:58 p.m.)


Could this error code be renamed to something like OffsetLoadingNotCompleteCode. Arguably this will convey the error code more clearly.
  1. Agree. Would include change in v3 patch.
It will be good to be specific about which channel the consumer failed to establish. In this case, let's mention "Unable to establish a channel for fetching offsets with any of the live brokers in %s".format(brokers.mkString(','))
  1. Agree. Would include change in v3 patch.
Is it a good idea for commitOffsets() to eat up every error that it encounters ? commitOffsets() is a public API and users want to use it to commit offsets on demand, manually. These users do not use auto commit offsets and use commitOffsets() to checkpoint offsets as often as the application logic dictates. For that use case, if the commitOffsets() has not actually successfully committed the offsets, the user of the API must know about it and retry as required. Thoughts?
  1. Correct me if I am wrong: the Producer API does not expose failures to outside world. In case of failures, producer would internally retry the failed messages but thats behind the hood and would not be visible to the caller. With embedded producer, I could not find a way so that consumers would about failures w/o modifying the producer code. As "embedded producer" was a temporary hack thingy, we refrained from doing modifications in producer code to expose this info. This could be something that can be handled in phase #2 ie. using OffsetCommitRequest.
It is probably better to be clearer on this error message as well. Something along the lines of "as offset bootstrap is still in progress on some brokers. This means leadership changed recently for the offsets topic"
  1. This is one of the points that Guozhang raised in his review comment 27.3 and now you; strong indication that I have got to change that sloppy message :) The loading process would be triggered by (a) broker startup and (b) leadership change. I tried to capture both these things in a log message but it looked ugly as it was too big to fit in one line. Technically 'broker startup' leads to leadership assignment which can be also seen as leadership change. With that argument, if we have to not distinguish between (a) and (b), then we could go with the message you suggested. Else, change the last part in your suggestion to "This means leadership changed recently for the offsets topic or the broker is starting up".
Curious - why do we need to use the singleton pattern here? Shouldn't only one thread invoke KafkaServer.startup?
  1. Its made singleton so that even if someone carelessly tries to create multiple offset managers on same server instance, there would still be a single offset manager. I agree that in current code this will not happen. The penalty of having multiple copies offset managers is huge in terms of memory and correctness, so made it singleton.
this file has turned into a big blob of code. It will help if you can separate the OffsetManager trait, the DefaultOffsetManager and ZookeeperOffsetManager into separate files
  1. Agree. Would include change in v3 patch.
I think it is best to not include any parameters to the startup() API as it is difficult to come up with a set of parameters that would work for all possible offset managers. What might work better is to include a generic init API that takes in a Properties object. This API initializes the context required for the offset manager. startup might or might not be useful if we add init(Properties), I'm not so sure.  
  1. > 'include a generic init API that takes in a Properties object' : 
    For this, KafkaServer needs to know which offset manager type it needs to spawn, then bake the properties relevant to it and pass it to 'init'. This wont abstract things from KafkaServer and everytime we add a new offset manager, the KafkaServer code must be modified. In the current patch, things are abstracted from KafkaServer. I could not figure out a way to achieve both: (a) abstracting offset manager type from KafkaServer and (b) making the startup arguments not implementation specific. Any suggestions ?
load the offsets from the logs is not generic enough. What if the offsets are stored in a database or custom flat files ? 
  1. In the current patch, whatever backend is used to store the offsets (zk or in-memory hash table or database or custom file), offsets are always logged in the logs of offsets topic. Load would serve as a way to read stuff from those logs into the offset managers' storage. This probably seems stupid as if a user just wants to use Zk for offsets, logging in the logs, maintaining replicas is an overhead. The only advantage with that is while switching across different offset manager implementations. ie. if you are using Zk based offset manager, all offsets are in the logs and Zk. Switching to inbuilt offset manager would be just a config change and broker would start populating in-memory hash table from logs once bounced. 
    I had raised this point over Jira (comment #2, point (2)) but have not heard anything about it. Any suggestions ?
Agree with Sriram that this could be named differently. It will also help if we describe the purpose of each of these APIs clearly. For example, if I want to store offsets in a database, how do I know why triggerLoadOffsets is required? Is it used to bootstrap some sort of offsets cache on startup ? 

Also try to describe when these APIs will be invoked on the Kafka server side. That will help the user implement a specific offset manager relatively easily
  1. If your concern is just about the naming and description then its easily fixable. Does 'syncOffsetsFromLogs' seems ok ?
There seems to be a race condition that might overwrite a newer offset with a stale one. This can happen when a broker becomes a leader for some partition of the offsets topic. When this happens, partition.makeLeader() exposes the broker as the new leader. At that point, it can start taking in offset commit requests. An offset commit request can come in at the same time that triggerLoadOffsets() is being invoked for the same offsets partition. putOffset() will go through and update the offsets table with the new offset. It does not touch commitsWhileLoading since loading does not have the key in it. Then the 1st statement in triggerLoadOffsets is executed and loading gets the offsets partition added to it. It goes ahead and updates the offsets table with the old offset since commitsWhileLoading was not updated by putOffset.
  1. The loading process is changed and in newer patch, timestamp would be stored along with offsets. With that, it becomes a easy to prevent for such overwrites.