Skip to content
This repository was archived by the owner on Nov 15, 2024. It is now read-only.

Commit ef340e1

Browse files
authored
Merge pull request #24 from ps-dev/adapt1-1533
Adapt1 1533 | Use offset-streams to check lag in internal topic
2 parents 4d6d7d3 + 5052eda commit ef340e1

2 files changed

Lines changed: 64 additions & 6 deletions

File tree

ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ trait ConsumerGroupsAlgebra[F[_]] {
3333

3434
def getAllConsumers: F[List[ConsumerTopics]]
3535

36+
def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]]
37+
3638
def getAllConsumersByTopic: F[List[TopicConsumers]]
3739

3840
def startConsumer: F[Unit]
@@ -96,6 +98,7 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe
9698

9799
override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup"
98100

101+
override def getOffsetsForInternalConsumerGroup: IO[List[PartitionOffset]] = ???
99102
}
100103

101104
object TestConsumerGroupsAlgebra {
@@ -148,11 +151,41 @@ object ConsumerGroupsAlgebra {
148151

149152
for {
150153
consumerGroupsStorageFacade <- Ref[F].of(ConsumerGroupsStorageFacade.empty)
154+
consumerGroupsOffsetFacade <- Ref[F].of(ConsumerGroupsOffsetFacade.empty)
151155
} yield new ConsumerGroupsAlgebra[F] {
152156

153157
override def getConsumersForTopic(topicName: String): F[TopicConsumers] =
154158
consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName)))
155159

160+
161+
override def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]] = {
162+
163+
for {
164+
groupOffsetsFromOffsetStream <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset())
165+
166+
// TODO: To be optimized
167+
largestOffsets <- kAA.getLatestOffsets(dvsConsumersTopic.value)
168+
.map(_.map(k => PartitionOffset
169+
(
170+
k._1.partition,
171+
groupOffsetsFromOffsetStream.getOrElse(k._1.partition, 0),
172+
k._2.value,
173+
-1
174+
)).toList)
175+
176+
offsetsWithLag = largestOffsets
177+
.map(
178+
k => PartitionOffset
179+
(
180+
k.partition,
181+
k.groupOffset,
182+
k.largestOffset,
183+
k.largestOffset - k.groupOffset
184+
)
185+
)
186+
}yield offsetsWithLag
187+
}
188+
156189
private def addStateToTopicConsumers(topicConsumers: TopicConsumers): F[TopicConsumers] = {
157190
val detailedF: F[List[Consumer]] = topicConsumers.consumers.traverse { consumer =>
158191
val fState = getConsumerActiveState(consumer.consumerGroupName)
@@ -179,6 +212,7 @@ object ConsumerGroupsAlgebra {
179212
ConsumerGroupsOffsetConsumer.start(kafkaClientAlgebra, kAA, sra, uniquePerNodeConsumerGroup, consumerOffsetsOffsetsTopicConfig,
180213
kafkaInternalTopic, dvsConsumersTopic, bootstrapServers, commonConsumerGroup, kafkaClientSecurityConfig)
181214
}
215+
_ <- Concurrent[F].start(consumeOffsetStreamIntoCache(offsetStream, consumerGroupsOffsetFacade))
182216
} yield ()
183217
}
184218

@@ -233,6 +267,17 @@ object ConsumerGroupsAlgebra {
233267
.makeRetryableWithNotification(Infinite, "ConsumerGroupsAlgebra")
234268
.compile.drain
235269
}
270+
271+
private def consumeOffsetStreamIntoCache[F[_] : ContextShift : ConcurrentEffect : Timer : Logger](
272+
offsetStream: fs2.Stream[F, Either[Throwable, (Partition, Offset)]],
273+
consumerGroupsOffsetFacade: Ref[F, ConsumerGroupsOffsetFacade]
274+
)(implicit notificationsService: InternalNotificationSender[F]): F[Unit] = {
275+
276+
offsetStream.evalTap {
277+
case Right((partition, offset)) => consumerGroupsOffsetFacade.update(_.addOffset(partition, offset))
278+
case _ => Logger[F].error("Error in consumeOffsetStreamIntoCache")
279+
}.compile.drain
280+
}
236281
}
237282

238283
private case class ConsumerGroupsStorageFacade(consumerMap: Map[TopicConsumerKey, TopicConsumerValue]) {
@@ -268,14 +313,15 @@ private object ConsumerGroupsStorageFacade {
268313

269314
private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) {
270315

271-
def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade = {
272-
val res = this.copy(this.offsetMap + (key -> value))
273-
println(this.offsetMap)
274-
res
275-
}
316+
def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade =
317+
this.copy(this.offsetMap + (key -> value))
318+
319+
def getAllPartitionOffset(): Map[Partition, Offset] =
320+
this.offsetMap
276321

277322
def removeOffset(key: Partition): ConsumerGroupsOffsetFacade =
278-
this.copy(this.offsetMap - key)
323+
this.copy(this.offsetMap - key)
324+
279325
}
280326

281327
private object ConsumerGroupsOffsetFacade {

ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ class ConsumerGroupsEndpoint[F[_]: Futurable](consumerGroupsAlgebra: ConsumerGro
3434
addHttpMetric("", StatusCodes.InternalServerError, "/v2/consumer-groups", startTime, method.value, error = Some(exception.getMessage))
3535
complete(StatusCodes.InternalServerError, exception.getMessage)
3636
}
37+
} ~ pathPrefix("hydra-internal-topic") {
38+
val startTime = Instant.now
39+
pathEndOrSingleSlash {
40+
onComplete(Futurable[F].unsafeToFuture(consumerGroupsAlgebra.getOffsetsForInternalConsumerGroup)) {
41+
case Success(detailedConsumer) =>
42+
addHttpMetric("hydra-internal-topic", StatusCodes.OK, "/v2/consumer-groups/hydra-internal-topic", startTime, method.value)
43+
complete(StatusCodes.OK, detailedConsumer)
44+
case Failure(exception) =>
45+
addHttpMetric("hydra-internal-topic", StatusCodes.InternalServerError, "/v2/consumer-groups/hydra-internal-topic", startTime, method.value, error = Some(exception.getMessage))
46+
complete(StatusCodes.InternalServerError, exception.getMessage)
47+
}
48+
}
3749
} ~ pathPrefix(Segment) { consumerGroupName =>
3850
val startTime = Instant.now
3951
pathEndOrSingleSlash {

0 commit comments

Comments
 (0)