KafkaManager.java 1.32 KB
Newer Older
张辰's avatar
张辰 committed
1 2
package com.tanpu.community.manager;

张辰's avatar
张辰 committed
3 4 5
import com.alibaba.fastjson.JSON;
import com.tanpu.community.api.beans.KafkaDurationUptMsg;
import com.tanpu.community.service.VisitSummaryService;
张辰's avatar
张辰 committed
6 7 8 9 10 11 12 13 14 15 16 17 18 19
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class KafkaManager {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

张辰's avatar
张辰 committed
20 21 22 23
    @Autowired
    private VisitSummaryService visitSummaryService;


张辰's avatar
张辰 committed
24 25 26 27 28 29 30 31 32 33 34 35
//    public void sendMessage(String message) {
//        System.out.println("#### send " + message);
//        this.kafkaTemplate.send("users", message);
//    }



    // todo topic
    @KafkaListener(topics = "newCommunityVisitor")
    public void consumeVisitorHis(String message) {
        System.out.println("#### receive " + message);
    }
张辰's avatar
张辰 committed
36 37 38 39 40 41 42

    // todo topic
    @KafkaListener(topics = "newCommunityVisitor")
    public void consumeDurationUpdate(String message) {
        KafkaDurationUptMsg msg = JSON.parseObject(message, KafkaDurationUptMsg.class);
        visitSummaryService.updateDurByIdent(msg.getIdent(), msg.getDurMillsInc().intValue());
    }
张辰's avatar
张辰 committed
43
}