SpringBoot实战(十四)之整合KafKa_玖富娱乐主管发布


玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。

 本身今天上午参考了很多博文,发明很多博文不是迥殊好,不是因为依靠争执题目就是因为版本题目。

因而我连系相干的博文和案例,本身改写了下并参考了下,因而就有了这篇文章。愿望能够或许给人人资助,少走一些弯路。

 

一、KafKa的引见

1.重要功用

依据官网的引见,ApacheKafka®是一个分布式流媒体平台,它重要有3种功用:

  a.宣布和定阅音讯流,这个功用类似于音讯行列,这也是kafka归类为音讯行列框架的缘由。

  b.以容错的体式格局纪录音讯流,kafka以文件的体式格局来存储音讯流。

  c.能够再音讯宣布的时刻举行处置惩罚。

 

2.运用场景

a.在体系或应用顺序之间构建牢靠的用于传输及时数据的管道,音讯行列功用。

b.构建及时的流数据处置惩罚顺序来变更或处置惩罚数据流,数据处置惩罚功用。

 

3.细致引见

 Kafka现在重要作为一个分布式的宣布定阅式的音讯体系运用,下面简朴引见一下kafka的基本机制

音讯传输历程:

 

Producer即生产者,向Kafka集群发送音讯,在发送音讯之前,会对音讯举行分类,即Topic,上图展现了两个producer发送了分类为topic1的音讯,别的一个发送了topic2的音讯。

 

Topic即主题,经由过程对音讯指定主题能够将音讯分类,消费者能够只存眷本身须要的Topic中的音讯

 

Consumer即消费者,消费者经由过程与kafka集群竖立长衔接的体式格局,不断地从集群中拉取音讯,然后能够对这些音讯举行处置惩罚。

 

二、装置

装置包下载地点:http://kafka.apache.org/downloads

找到0.11.0.1版本,如图:

1.下载

wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

 

2.解压

tar -xzvf kafka_2.11-0.11.0.1.tgz

设置装备摆设说明:

    consumer.properites 消费者设置装备摆设,这个设置装备摆设文件用于设置装备摆设开启的消费者,此处我们运用默许的便可。

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。-

    producer.properties 生产者设置装备摆设,这个设置装备摆设文件用于设置装备摆设开启的生产者,此处我们运用默许的便可。

  server.properties kafka服务器的设置装备摆设,此设置装备摆设文件用来设置装备摆设kafka服务器,现在仅引见几个最基本的设置装备摆设。

       a.broker.id 说明以后kafka服务器在集群中的独一ID,需设置装备摆设为integer,而且集群中的每个kafka服务器的id都应是独一的,我们这里接纳默许设置装备摆设便可。

       b.listeners 说明此kafka服务器须要监听的端口号,若是是在本机上跑虚拟机运转能够不消设置装备摆设本项,默许会运用localhost的地点,若是是在长途服务器上运转则必需设置装备摆设,

比方:listeners=PLAINTEXT:// 192.168.126.143:9092。并确保服务器的9092端口能够或许接见。

  c.zookeeper.connect 说明kafka所衔接的zookeeper的地点 ,需设置装备摆设为zookeeper的地点,因为本次运用的是kafka高版本中自带zookeeper,

运用默许设置装备摆设便可,zookeeper.connect=localhost:2181。

 

3.运转

起首运转zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

运转胜利,显现如图:

 

然后运转kafka

bin/kafka-server-start.sh config/server.properties

 运转胜利,显现如图:

 

三、整合KafKa

1.新建Maven项目导入Maven依靠

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.test</groupId>
  <artifactId>kafka_demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>

    </dependencies>
 
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            
   <!-- 指定编译版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
   
        
    
            
    
        
        <finalName>${project.artifactId}</finalName>
            

    </build>

   
</project>

 

2.编写音讯实体

package com.springboot.kafka.bean;


import java.util.Date;

import lombok.Data;
 


@Data
public class Message {
    private Long id;    //id

    private String msg; //音讯

    private Date sendTime;  //时候戳

}

 有了lombok,每次编写实体不必要运用快捷键天生seter或geter要领了,代码看起来越发简约了。

 

3.编写音讯发送者(能够理解为生产者,最好联络细致引见中的图)

package com.springboot.kafka.producer;

import java.util.Date;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    //发送音讯要领
    public void send() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("                       message = {}", gson.toJson(message));
        kafkaTemplate.send("zhisheng", gson.toJson(message));
    }
}

 

4.编写音讯接收者(能够理解为消费者)

package com.springboot.kafka.producer;

import java.util.Date;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    //发送音讯要领
    public void send() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("                       message = {}", gson.toJson(message));
        kafkaTemplate.send("zhisheng", gson.toJson(message));
    }
}

 

5.编写启动类

package com.springboot.kafka;
 


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import com.springboot.kafka.producer.KafkaSender;


@SpringBootApplication
public class KafkaApplication {

    public static void main(String[] args) {

        ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);

        KafkaSender sender = context.getBean(KafkaSender.class);

        for (int i = 0; i < 3; i  ) {
            //挪用音讯发送类中的音讯发送要领
            sender.send();

            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

 

6.编写application.properties设置装备摆设文件

#============== kafka ===================
# u6307u5B9Akafka u4EE3u7406u5730u5740uFF0Cu53EFu4EE5u591Au4E2A
spring.kafka.bootstrap-servers=192.168.126.143:9092

#=============== provider  =======================

spring.kafka.producer.retries=0
# u6BCFu6B21u6279u91CFu53D1u9001u6D88u606Fu7684u6570u91CF
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# u6307u5B9Au6D88u606Fkeyu548Cu6D88u606Fu4F53u7684u7F16u89E3u7801u65B9u5F0F
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# u6307u5B9Au9ED8u8BA4u6D88u8D39u8005group id
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# u6307u5B9Au6D88u606Fkeyu548Cu6D88u606Fu4F53u7684u7F16u89E3u7801u65B9u5F0F
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

7.运转结果

 

示例代码地点:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo

若是依照上述流程没有到达估计的结果能够git clone到当地。

 

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。