ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

思考kafka在做幂等性的时候,producerId在内存中什么时候清理掉

2022-04-29 02:31:52  阅读:131  来源: 互联网

标签:case ... val producerId kafka 内存 server def


情景

  最近在看kafka幂等性的源码的时候,在思考一个问题,既然幂等性是通过producerId + Sequence Number来判断是否重复,那么应该在broker缓存中,有保存producerId 和 Sequence Number,那么如果长时间一直使用,是否会由于 producerId 和 Sequence Number 的增长,造成OOM呢?在网上没找到答案,所以本文通过源码,来找到这个答案

代码追踪

  本文不再从具体的producer代码分析了,直接从ApiKeys的代码开始,直接追踪,前面有不理解的朋友,请参考:【转载】万字长文干货 | Kafka 事务性之幂等性实现 

  【步骤1】kafka.server.KafkaApis#handle

 1 def handle(request: RequestChannel.Request) {
 2   try {
 3     ...
 4     request.header.apiKey match {
 5       //处理发上来的请求
 6       case ApiKeys.PRODUCE => handleProduceRequest(request)
 7       ...
 8     }
 9   } catch {
10     ...
11   } finally {
12     ...
13   }
14 }

  

  【步骤2】kafka.server.KafkaApis#handleProduceRequest

 1 def handleProduceRequest(request: RequestChannel.Request) {
 2   ...
 3 
 4   if (authorizedRequestInfo.isEmpty)
 5     ...
 6   else {
 7     val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
 8 
 9     //【重点】添加数据
10     replicaManager.appendRecords(
11       timeout = produceRequest.timeout.toLong,
12       requiredAcks = produceRequest.acks,
13       internalTopicsAllowed = internalTopicsAllowed,
14       isFromClient = true,
15       entriesPerPartition = authorizedRequestInfo,
16       responseCallback = sendResponseCallback,
17       processingStatsCallback = processingStatsCallback)
18 
19     ...
20   }
21 }

  

  【步骤3】kafka.server.ReplicaManager#appendRecords

 1 def appendRecords(timeout: Long,
 2                   requiredAcks: Short,
 3                   internalTopicsAllowed: Boolean,
 4                   isFromClient: Boolean,
 5                   entriesPerPartition: Map[TopicPartition, MemoryRecords],
 6                   responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
 7                   delayedProduceLock: Option[Lock] = None,
 8                   processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
 9   if (isValidRequiredAcks(requiredAcks)) {
10     ...
11     //【重点】添加到本地Log
12     val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
13       isFromClient = isFromClient, entriesPerPartition, requiredAcks)
14     ...
15   } else {
16     ...
17   }
18 }

  

  【步骤4】kafka.server.ReplicaManager#appendToLocalLog

private def appendToLocalLog(internalTopicsAllowed: Boolean,
                             isFromClient: Boolean,
                             entriesPerPartition: Map[TopicPartition, MemoryRecords],
                             requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
  trace(s"Append [$entriesPerPartition] to local log")
  entriesPerPartition.map { case (topicPartition, records) =>
    brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
    brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()

    // reject appending to internal topics if it is not allowed
    if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
      ...
    } else {
      try {
        //获取分区操作对象
        val partitionOpt = getPartition(topicPartition)
        val info = partitionOpt match {
          case Some(partition) =>
            if (partition eq ReplicaManager.OfflinePartition)
              throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
            //【重点】在分区中添加数据
            partition.appendRecordsToLeader(records, isFromClient, requiredAcks)

          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
            .format(topicPartition, localBrokerId))
        }

        ...
      } catch {
        ...
    }
  }
}

  

  【步骤5】kafka.cluster.Partition#appendRecordsToLeader

 1 def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
 2   val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
 3     leaderReplicaIfLocal match {
 4       case Some(leaderReplica) =>
 5         ...
 6 
 7         //【重点】
 8         val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
 9         ...
10       case None =>
11         ...
12     }
13   }
14   
15   ...
16 }

  

  【步骤6】kafka.log.Log#appendAsLeader

def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = {
  //【重点】添加数据
  append(records, isFromClient, assignOffsets = true, leaderEpoch)
}

 

  【步骤7】kafka.log.Log#append

1 private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
2   maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
3     //【重点】检查和验证数据
4     val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
5     ...
6   }
7 }

  

  【步骤8】kafka.log.Log#analyzeAndValidateRecords

 1 private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
 2 (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
 3   ...
 4   
 5   //遍历所有批次
 6   for (batch <- records.batches.asScala if batch.hasProducerId) {
 7     //【重点】这里就是producerId的缓存,返回的结果是 ProducerStateEntry 类型
 8     val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
 9     ...
10     if (isFromClient) {
11       //【重点】重点就是findDuplicateBatch方法,是检查每个数据,是否发送重复,这个方法来自ProducerStateEntry类里面
12       maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
13         return (updatedProducers, completedTxns.toList, Some(duplicate))
14       }
15     }
16     
17     ...
18   }
19   ...
20 }

  

  【步骤9】kafka.log.ProducerStateEntry#findDuplicateBatch

def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
  //如果这个批次的producerEpoch(生产代)不等于当前生产代,就没有重复
  if (batch.producerEpoch != producerEpoch)
     None
  else
    //【重点】检查重复的sequence num
    batchWithSequenceRange(batch.baseSequence, batch.lastSequence)
}

  

  【步骤10】kafka.log.ProducerStateEntry#batchWithSequenceRange

1 def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = {
2   //【重点】过滤出重复的数据,只要offset范围在缓存范围有重合,代表有重复数据
3   val duplicate = batchMetadata.filter { metadata =>
4     firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
5   }
6   duplicate.headOption
7 }

  

  在这里,我们就知道在【步骤8】,里面的这句,就是producerId缓存的地方,进去查看下

1 //【重点】这里就是producerId的缓存,返回的结果是 ProducerStateEntry 类型
2  8     val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)

  

  kafka.log.ProducerStateManager#lastEntry

  可以看到 producerId 就缓存在producers里

//可以看到 producerId 就缓存在producers里
def lastEntry(producerId: Long): Option[ProducerStateEntry] = producers.get(producerId)

 

  查看下producers是什么类型

1 private val producers = mutable.Map.empty[Long, ProducerStateEntry]

  

  在上下文搜索下,有没有 删除 prodcuerId的动作,找到下面三个地方

  kafka.log.ProducerStateManager#truncateHead

 1 def truncateHead(logStartOffset: Long) {
 2   val evictedProducerEntries = producers.filter { case (_, producerState) =>
 3     !isProducerRetained(producerState, logStartOffset)
 4   }
 5 
 6   val evictedProducerIds = evictedProducerEntries.keySet
 7 
 8   //【重点】删除producerId
 9   producers --= evictedProducerIds
10   removeEvictedOngoingTransactions(evictedProducerIds)
11   removeUnreplicatedTransactions(logStartOffset)
12 
13   if (lastMapOffset < logStartOffset)
14     lastMapOffset = logStartOffset
15 
16   deleteSnapshotsBefore(logStartOffset)
17   lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
18 }

  向上追踪

  kafka.log.ProducerStateManager#truncateAndReload

 1 def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
 2   // remove all out of range snapshots
 3   deleteSnapshotFiles(logDir, { snapOffset =>
 4     snapOffset > logEndOffset || snapOffset <= logStartOffset
 5   })
 6 
 7   if (logEndOffset != mapEndOffset) {
 8     producers.clear()
 9     ongoingTxns.clear()
10 
11     // since we assume that the offset is less than or equal to the high watermark, it is
12     // safe to clear the unreplicated transactions
13     unreplicatedTxns.clear()
14     loadFromSnapshot(logStartOffset, currentTimeMs)
15   } else {
16     //【重点】
17     truncateHead(logStartOffset)
18   }
19 }

 

  kafka.log.Log#recoverSegment

    这个方法都是在Log初始化的时候,才会调用,所以排除

 

  kafka.log.Log#loadProducerState

    这个方法,向上追踪,一个来源于Log初始化,所以排除

    一个来源于 kafka.log.Log#truncateTo,是follower副本向leader副本同步数据的时候触发,这个比较可疑

 

  kafka.log.Log#truncateTo

 1 private[log] def truncateTo(targetOffset: Long): Boolean = {
 2   maybeHandleIOException(s"Error while truncating log to offset $targetOffset for $topicPartition in dir ${dir.getParent}") {
 3     if (targetOffset < 0)
 4       throw new IllegalArgumentException(s"Cannot truncate partition $topicPartition to a negative offset (%d).".format(targetOffset))
 5     if (targetOffset >= logEndOffset) {
 6       info(s"Truncating to $targetOffset has no effect as the largest offset in the log is ${logEndOffset - 1}")
 7       false
 8     } else {
 9       info(s"Truncating to offset $targetOffset")
10       lock synchronized {
11         checkIfMemoryMappedBufferClosed()
12         if (segments.firstEntry.getValue.baseOffset > targetOffset) {
13           truncateFullyAndStartAt(targetOffset)
14         } else {
15           val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
16           deletable.foreach(deleteSegment)
17           
18           //【重点】清理
19           activeSegment.truncateTo(targetOffset)
20           updateLogEndOffset(targetOffset)
21           this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
22           this.logStartOffset = math.min(targetOffset, this.logStartOffset)
23           _leaderEpochCache.clearAndFlushLatest(targetOffset)
24           loadProducerState(targetOffset, reloadFromCleanShutdown = false)
25         }
26         true
27       }
28     }
29   }
30 }

 

  kafka.cluster.Partition#truncateTo

1 def truncateTo(offset: Long, isFuture: Boolean) {
2   // The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread
3   // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
4   inReadLock(leaderIsrUpdateLock) {
5     //【重点】
6     logManager.truncateTo(Map(topicPartition -> offset), isFuture = isFuture)
7   }
8 }

  

  kafka.server.ReplicaFetcherThread#maybeTruncate

override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
  val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
  val partitionsWithError = mutable.Set[TopicPartition]()

  fetchedEpochs.foreach { case (tp, epochOffset) =>
    try {
      val replica = replicaMgr.getReplicaOrException(tp)
      val partition = replicaMgr.getPartition(tp).get
      //如果 epochOffset 有异常
      if (epochOffset.hasError) {
        info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}")
        partitionsWithError += tp
      } else {
        val fetchOffset =
          if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
            warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " +
              s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
            partitionStates.stateValue(tp).fetchOffset
          } else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset)
            logEndOffset(replica, epochOffset)
          else
            epochOffset.endOffset
        //【重点】
        partition.truncateTo(fetchOffset, isFuture = false)
        replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, fetchOffset)
        fetchOffsets.put(tp, fetchOffset)
      }
    } catch {
      case e: KafkaStorageException =>
        info(s"Failed to truncate $tp", e)
        partitionsWithError += tp
    }
  }

  ResultWithPartitions(fetchOffsets, partitionsWithError)
}

  

  kafka.server.AbstractFetcherThread#doWork

override def doWork() {
  //【重点】可能清理
  maybeTruncate()
  val fetchRequest = inLock(partitionMapLock) {
    val ResultWithPartitions(fetchRequest, partitionsWithError) = buildFetchRequest(states)
    if (fetchRequest.isEmpty) {
      trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
      partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
    }
    handlePartitionsWithErrors(partitionsWithError)
    fetchRequest
  }
  if (!fetchRequest.isEmpty)
    processFetchRequest(fetchRequest)
}

  

  kafka.utils.ShutdownableThread#run

 1 override def run(): Unit = {
 2   info("Starting")
 3   try {
 4     while (isRunning)
 5       //【重点】一直执行,直到isRunning=false
 6       doWork()
 7   } catch {
 8     case e: FatalExitError =>
 9       shutdownInitiated.countDown()
10       shutdownComplete.countDown()
11       info("Stopped")
12       Exit.exit(e.statusCode())
13     case e: Throwable =>
14       if (isRunning)
15         error("Error due to", e)
16   } finally {
17      shutdownComplete.countDown()
18   }
19   info("Stopped")
20 }

  

  ShutdownableThread 的实现类有 kafka.server.AbstractFetcherThread

  kafka.server.AbstractFetcherThread 的实现类有 kafka.server.ReplicaFetcherThread

  kafka.server.ReplicaFetcherThread  这个类被创建,然后运行,就会自动清理了

  kafka.server.ReplicaFetcherManager#createFetcherThread

1 override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
2   val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("")
3   val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
4   //新建ReplicaFetcherThread
5   new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, replicaManager, metrics, time, quotaManager)
6 }

  

  kafka.server.AbstractFetcherManager#addFetcherForPartitions

 1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {
 2   lock synchronized {
 3     val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialFetchOffset) =>
 4       BrokerAndFetcherId(brokerAndInitialFetchOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
 5 
 6     def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) {
 7       //【重点2】
 8       val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
 9       fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
10       //【重点3】
11       fetcherThread.start
12     }
13 
14     for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
15       val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
16       fetcherThreadMap.get(brokerIdAndFetcherId) match {
17         case Some(f) if f.sourceBroker.host == brokerAndFetcherId.broker.host && f.sourceBroker.port == brokerAndFetcherId.broker.port =>
18           // reuse the fetcher thread
19         case Some(f) =>
20           f.shutdown()
21           addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
22         case None =>
23           //【重点1】
24           addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
25       }
26 
27       fetcherThreadMap(brokerIdAndFetcherId).addPartitions(initialFetchOffsets.map { case (tp, brokerAndInitOffset) =>
28         tp -> brokerAndInitOffset.initOffset
29       })
30     }
31   }
32 
33   info("Added fetcher for partitions %s".format(partitionAndOffsets.map { case (topicPartition, brokerAndInitialOffset) =>
34     "[" + topicPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
35 }

  

  kafka.server.AbstractFetcherManager#resizeThreadPool

 1 def resizeThreadPool(newSize: Int): Unit = {
 2   def migratePartitions(newSize: Int): Unit = {
 3     fetcherThreadMap.foreach { case (id, thread) =>
 4       val removedPartitions = thread.partitionsAndOffsets
 5       removeFetcherForPartitions(removedPartitions.keySet)
 6       if (id.fetcherId >= newSize)
 7         thread.shutdown()
 8       //【重点】
 9       addFetcherForPartitions(removedPartitions)
10     }
11   }
12   lock synchronized {
13     val currentSize = numFetchersPerBroker
14     info(s"Resizing fetcher thread pool size from $currentSize to $newSize")
15     numFetchersPerBroker = newSize
16     if (newSize != currentSize) {
17       // We could just migrate some partitions explicitly to new threads. But this is currently
18       // reassigning all partitions using the new thread size so that hash-based allocation
19       // works with partition add/delete as it did before.
20       migratePartitions(newSize)
21     }
22     shutdownIdleFetcherThreads()
23   }
24 }

  

  kafka.server.DynamicThreadPool#reconfigure

 1 override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
 2   if (newConfig.numIoThreads != oldConfig.numIoThreads) {
 3     //【重点】
 4     server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
 5   }
 6   if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
 7     server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
 8   if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
 9     //【重点】
10     server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
11   if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir)
12     server.getLogManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
13   if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
14     //【重点】
15     server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
16 }

  

  kafka.server.DynamicBrokerConfig#updateCurrentConfig

 1 private def updateCurrentConfig(): Unit = {
 2   val newProps = mutable.Map[String, String]()
 3   newProps ++= staticBrokerConfigs
 4   overrideProps(newProps, dynamicDefaultConfigs)
 5   overrideProps(newProps, dynamicBrokerConfigs)
 6   val oldConfig = currentConfig
 7   val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)
 8   if (newConfig ne currentConfig) {
 9     currentConfig = newConfig
10     kafkaConfig.updateCurrentConfig(newConfig)
11 
12     // Process BrokerReconfigurable updates after current config is updated
13     //【重点】
14     brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
15   }
16 }

  

  kafka.server.DynamicBrokerConfig#updateBrokerConfig

 1 private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
 2   try {
 3     val props = fromPersistentProps(persistentProps, perBrokerConfig = true)
 4     dynamicBrokerConfigs.clear()
 5     dynamicBrokerConfigs ++= props.asScala
 6     //【重点】
 7     updateCurrentConfig()
 8   } catch {
 9     case e: Exception => error(s"Per-broker configs of $brokerId could not be applied: $persistentProps", e)
10   }
11 }

  

  kafka.server.DynamicBrokerConfig#initialize

1 private[server] def initialize(zkClient: KafkaZkClient): Unit = {
2   currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
3   val adminZkClient = new AdminZkClient(zkClient)
4   updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
5   val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
6   val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
7   //【重点】
8   updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
9 }

  

  kafka.server.KafkaServer#startup

       在KafkaServer的启动方法中,初始化Broker的配置

 1 def startup() {
 2   try {
 3     ...
 4     
 5     val canStartup = isStartingUp.compareAndSet(false, true)
 6     if (canStartup) {
 7       ...
 8       
 9       // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
10       // applied after DynamicConfigManager starts.
11       //【重点】
12       config.dynamicConfig.initialize(zkClient)
13 
14       ...
15     }
16   }
17   catch {
18     ...
19   }
20 }

  自此,我们就追踪到了,清理producerId的源码了,在kafkerServer启动后,初始化Borker的配置的时候,就会启动一个while循环一直执行dowork()方法,一直做清理的操作,所以不用担心producerId占用内存导致OOM

 

标签:case,...,val,producerId,kafka,内存,server,def
来源: https://www.cnblogs.com/byzgss/p/16205226.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有