搜索

kafka(一)代码


发布时间: 2022-11-24 17:36:08    浏览次数:20 次

生产者

生产者异步发送
    import java.util
    import java.util.{Properties, UUID}

    import org.apache.kafka.clients.producer.{Callback, KafkaProducer, Partitioner, ProducerConfig, ProducerRecord, RecordMetadata}
    import org.apache.kafka.common.Cluster
    import org.apache.kafka.common.serialization.StringSerializer
    val properties = new Properties()
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092")
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
    val kafkaProducer = new KafkaProducer[String,String](properties)
    kafkaProducer.send(new ProducerRecord("events","kafka"))
    kafkaProducer.close()
生产者同步发送
    kafkaProducer.send(new ProducerRecord("events","kafka")).get()
自定义分区
    import org.apache.kafka.clients.producer.Partitioner
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,classOf[StringSerializer].getName)//自定义分区
    class MyPartition extends Partitioner{
      override def partition(topic: String, key: Any, bytes: Array[Byte], value: Any, bytes1: Array[Byte], cluster: Cluster): Int = {
        val par = if(value.toString.contains("abcd")) 1 else 0
        par
      }
      override def close(): Unit = {}
      override def configure(map: util.Map[String, _]): Unit = {}
    }
事务前提ACK设置
    val ioproperties = new Properties()
    //ack 0(生产者发送数据,不需要等数据罗盘) 1(发送数据,leader收到数据后应答) 
    //-1(leader和ISR队列里面所有节点收齐数据后应答)
    //-1 follower长时间未向leader发送数据(30s),将follower踢出去
    //如何保证完全可靠:ack=-1 分区副本大于2 ISR里面应答最小副本数量大于等于2
    //ack=0:一般不用   ack=1:允许丢个别的数据  ack=-1:传输与钱相关的数据
    ioproperties.put(ProducerConfig.ACKS_CONFIG,"-1")
    ioproperties.put(ProducerConfig.RETRIES_CONFIG,"5")//重试次数为Int最大值  设置小一点
事务前提幂等性设置
    val ioproperties = new Properties()
    //传递语义:1.At Least Once(至少一次,不丢失) : ACK=-1 副本数>=2 ISR里应答最小副本>=2
    //2.At Most Once (至多一次,不重复): ACK=0
    //3.Exlctly Once(精确一次):幂等性+At Least Once
    // 幂等性:(PID,Partition,SeqNumber)PID每次重启分配一个,Partition分区号,SeqNumber单调递增  
    //只能保证单分区单会话不重复
    //幂等性 默认就是开启
    ioproperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
生产者事务代码
    kafkaProducer.initTransactions()
    kafkaProducer.beginTransaction()
    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,UUID.randomUUID())
    try {
      kafkaProducer.send(new ProducerRecord("events","kafka"),new Callback(){
        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
          if(e == null){
            println(recordMetadata.topic()+" "+recordMetadata.partition())
          }
          else{
            println("error")
          }
        }
      })
      kafkaProducer.commitTransaction()
    } catch {
      case e: ArithmeticException => println(e)
      kafkaProducer.abortTransaction()
    } finally {
      kafkaProducer.close()
    }
数据乱序的处理
    //开启幂等性(最多请求5波数据)   未开启幂等性(1波数据)
    //kafka会缓存producer发来的最近5个request的元数据(会把数据进行排序SeqNumber)  所以是有序的
提高生产则吞吐量
    //1.batch.soze=16K  2.linger.ms=5-100ms 
    //3.compression.type:snappy 4.RecordAccumulator:64M
    val ioproperties = new Properties()
    ioproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")//批次大小
    ioproperties.put(ProducerConfig.LINGER_MS_CONFIG,"70")//linger.ms
    //压缩 gzip snappy lz4 zstd
    ioproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy")
    //缓冲区大小
    ioproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432")

消费者

消费者 helloworld
    import org.apache.kafka.clients.consumer.KafkaConsumer
    import java.util.{Properties, ArrayList,Map,HashMap}
    import java.time.Duration

    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
    val properties = new Properties()
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"114.116.44.117:9092")
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,classOf[StringDeserializer].getName)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,classOf[StringDeserializer].getName)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test")
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000")
消费topic数据
    val kafkaConsumer = new KafkaConsumer[String,String](properties) //消费topic数据
    val topics = new util.ArrayList[String]()
    topics.add("events")
    kafkaConsumer.subscribe(topics)
消费topic 分区数据
    val kafkaConsumer = new KafkaConsumer[String,String](properties)
    val topics = new ArrayList[TopicPartition]()
    topics.add(new TopicPartition("events",1))
    kafkaConsumer.assign(topics)
手动指定offset
    val kafkaConsumer = new KafkaConsumer[String,String](properties)
    import scala.collection.JavaConversions.asScalaSet
    val assignment = kafkaConsumer.assignment().toArray[TopicPartition]
    for (partition <- assignment){
      kafkaConsumer.seek(partition,200)
    }
根据时间指定offset
    import scala.collection.JavaConversions.asScalaSet  //手动
    val assignment = kafkaConsumer.assignment().toArray[TopicPartition]
    val topicPartitionHashMap:Map[TopicPartition,java.lang.Long] = new HashMap[TopicPartition,java.lang.Long]();
    for (partition <- assignment){
      topicPartitionHashMap.put(partition,System.currentTimeMillis()-24*3600*1000)//手动指定offset
    }
    import org.apache.kafka.clients.consumer.OffsetAndTimestamp
    val topicPartitionAndOffsetAndTimestamp:Map[TopicPartition,OffsetAndTimestamp] = 
        kafkaConsumer.offsetsForTimes(topicPartitionHashMap)
    for (topicPartition <- assignment){
      val offset = topicPartitionAndOffsetAndTimestamp.get(topicPartition).offset()
      kafkaConsumer.seek(topicPartition,offset)//手动指定offset
    }

    val consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1))

    println(consumerRecords)
免责声明 kafka(一)代码,资源类别:文本, 浏览次数:20 次, 文件大小:-- , 由本站蜘蛛搜索收录2022-11-24 05:36:08。此页面由程序自动采集,只作交流和学习使用,本站不储存任何资源文件,如有侵权内容请联系我们举报删除, 感谢您对本站的支持。 原文链接:https://www.cnblogs.com/wuxiaolong4/p/16800468.html