Skip to content

Commit

Permalink
[PI-5013] do not cache write client to force timeline server restart …
Browse files Browse the repository at this point in the history
…before write (#22)
  • Loading branch information
psendyk authored Jan 17, 2025
1 parent aba2433 commit 1bda901
Showing 1 changed file with 1 addition and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ class HoodieStreamingSink(sqlContext: SQLContext,

private var asyncCompactorService: AsyncCompactService = _
private var asyncClusteringService: AsyncClusteringService = _
private var writeClient: Option[SparkRDDWriteClient[_]] = Option.empty
private var hoodieTableConfig: Option[HoodieTableConfig] = Option.empty

override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized {
Expand Down Expand Up @@ -138,7 +137,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
val identifier = options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId)))
}
}))), writeClient)
}))), Option.empty[SparkRDDWriteClient[_]])
)
match {
case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
Expand All @@ -149,7 +148,6 @@ class HoodieStreamingSink(sqlContext: SQLContext,
}))
log.info(s"Current value of latestCommittedBatchId: $latestCommittedBatchId. Setting latestCommittedBatchId to batchId $batchId.")
latestCommittedBatchId = batchId
writeClient = Some(client)
hoodieTableConfig = Some(tableConfig)
if (client != null) {
metaClient = Some(HoodieTableMetaClient.builder()
Expand Down Expand Up @@ -314,11 +312,6 @@ class HoodieStreamingSink(sqlContext: SQLContext,
asyncClusteringService.shutdown(force)
asyncClusteringService = null
}

if (writeClient.isDefined) {
writeClient.get.close()
writeClient = Option.empty
}
}

private def canSkipBatch(incomingBatchId: Long, operationType: String): Boolean = {
Expand Down

0 comments on commit 1bda901

Please sign in to comment.