From d43e7df934cc481b0107dfd3dffe130037414018 Mon Sep 17 00:00:00 2001 From: Mingmin Chen Date: Sat, 7 Apr 2018 18:07:44 -0700 Subject: [PATCH] simplify ResetOffset --- internal/consumer/partitionConsumer.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/internal/consumer/partitionConsumer.go b/internal/consumer/partitionConsumer.go index 4a043d1..3ee2d6e 100644 --- a/internal/consumer/partitionConsumer.go +++ b/internal/consumer/partitionConsumer.go @@ -401,20 +401,18 @@ func (p *rangePartitionConsumer) offsetLoop() { // ResetOffset will reset the consumer offset for the specified partition. func (p *rangePartitionConsumer) ResetOffset(offsetRange kafka.OffsetRange) error { - if offsetRange.HighOffset == -1 { - return errors.New("rangePartitionConsumer expects a highwatermark when resetting offset") - } - // If it has been closed, return error. select { - case _, ok := <-p.stopC: - if !ok { - return errors.New("unable to ResetOffset for rangePartitionConsumer has been closed") - } - case <-time.After(time.Nanosecond): + case <-p.stopC: + return errors.New("unable to ResetOffset for rangePartitionConsumer has been closed") + default: break } + if offsetRange.HighOffset == -1 { + return errors.New("rangePartitionConsumer expects a highwatermark when resetting offset") + } + select { case p.offsetRangeC <- offsetRange: return nil