Commit 08c07618 authored by 刘基明's avatar 刘基明

Merge remote-tracking branch 'origin/dev' into dev

parents 12861120 e106ed6e
...@@ -9,7 +9,7 @@ public enum FileTypeEnum { ...@@ -9,7 +9,7 @@ public enum FileTypeEnum {
IMAGE(1,"图片类型"),QUERY_IDOLS(2,"查询关注"); IMAGE(1,"图片类型"),QUERY_IDOLS(2,"查询关注");
public static final HashSet<String> imageTypeSet = SetUtils.hashSet("jpg", "jpeg", "png"); public static final HashSet<String> ossTypeSet = SetUtils.hashSet("jpg", "jpeg", "png", "txt");
private Integer code; private Integer code;
private String type; private String type;
......
...@@ -10,6 +10,7 @@ import com.tanpu.community.api.beans.resp.FileUploadResp; ...@@ -10,6 +10,7 @@ import com.tanpu.community.api.beans.resp.FileUploadResp;
import com.tanpu.community.api.enums.OssDirEnum; import com.tanpu.community.api.enums.OssDirEnum;
import com.tanpu.community.cache.RedisCache; import com.tanpu.community.cache.RedisCache;
import com.tanpu.community.manager.FileManager; import com.tanpu.community.manager.FileManager;
import com.tanpu.community.service.RankLogService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.caffeine.CaffeineCacheManager; import org.springframework.cache.caffeine.CaffeineCacheManager;
...@@ -28,6 +29,9 @@ public class FileController { ...@@ -28,6 +29,9 @@ public class FileController {
@Autowired @Autowired
private FileManager fileManager; private FileManager fileManager;
@Autowired
private RankLogService rankLogService;
@Resource @Resource
private UserHolder userHolder; private UserHolder userHolder;
...@@ -47,28 +51,16 @@ public class FileController { ...@@ -47,28 +51,16 @@ public class FileController {
return CommonResp.success(fileManager.uploadFile(file, OssDirEnum.Theme_Pic, userId)); return CommonResp.success(fileManager.uploadFile(file, OssDirEnum.Theme_Pic, userId));
} }
@GetMapping("/test") @GetMapping("/clearRankLog")
public String test() { public String clearRankLog() {
Thread t = new Thread(new Runnable() {
@Override
// redisCache.put("11111", JsonUtil.toJson(list), 60); public void run() {
// rankLogService.clearRankLog();
// String v = redisCache.get("11111"); }
// System.out.println(v); });
// System.out.println(JsonUtil.toJson(JsonUtil.toBean(v, new TypeReference<List<String>>() {
// })));
// localCache.getCache("local").put("999", "6666666"); t.start();
// System.out.println((String) localCache.getCache("local").get("999").get()); return "success";
//
//
// for (int i = 0; i < 30; i++) {
// System.out.println(fileManager.getId("" + i / 2));
// }
//
// for (int i = 30; i > 0; i--) {
// System.out.println(fileManager.getId("" + i / 2));
// }
return "";
} }
} }
...@@ -5,6 +5,8 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; ...@@ -5,6 +5,8 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Select;
import java.util.List;
/** /**
* <p> * <p>
* 话题排序日志记录’ Mapper 接口 * 话题排序日志记录’ Mapper 接口
...@@ -17,4 +19,7 @@ public interface RankLogMapper extends BaseMapper<RankLogEntity> { ...@@ -17,4 +19,7 @@ public interface RankLogMapper extends BaseMapper<RankLogEntity> {
@Select("select max(round) from rank_log where type = #{type}") @Select("select max(round) from rank_log where type = #{type}")
Long selectMaxRound(@Param("type") Integer type); Long selectMaxRound(@Param("type") Integer type);
@Select("select * from rank_log where type = #{type} order by id asc limit #{batchSize}")
List<RankLogEntity> selectByTypeLimit(@Param("type") Integer type, @Param("batchSize") Integer batchSize);
} }
\ No newline at end of file
package com.tanpu.community.manager; package com.tanpu.community.manager;
import com.tanpu.community.service.RankService; import com.tanpu.community.service.*;
import com.tanpu.community.service.RecommendService;
import com.tanpu.community.service.RedisService;
import com.tanpu.community.service.VisitLogService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
...@@ -24,6 +21,9 @@ public class ConJobManager { ...@@ -24,6 +21,9 @@ public class ConJobManager {
@Autowired @Autowired
private RankService rankService; private RankService rankService;
@Autowired
private RankLogService rankLogService;
@Autowired @Autowired
private RecommendService recommendService; private RecommendService recommendService;
...@@ -53,4 +53,12 @@ public class ConJobManager { ...@@ -53,4 +53,12 @@ public class ConJobManager {
public void getThemeNewest() { public void getThemeNewest() {
recommendService.refreshNewestThemes(); recommendService.refreshNewestThemes();
} }
/**
* 定时把rank_log的日志拿出来,清理数据库
*/
@Scheduled(cron = "0 0 0 ? * 1")
public void clearRankLog() {
rankLogService.clearRankLog();
}
} }
...@@ -31,7 +31,7 @@ public class VisitSummaryManager { ...@@ -31,7 +31,7 @@ public class VisitSummaryManager {
@KafkaListener(topics = kafakTopic) @KafkaListener(topics = kafakTopic)
public void updateVisitSummary(String message) { public void updateVisitSummary(String message) {
// {"durMillsInc":10000,"ident":"AD7B8CE8-2DA4-4FB4-907F-C551B926BA5C","localDate":"2021-08-02","pageId":"p13503","refId":"88737580570230824","visitorId":"275321532031467520"} // {"durMillsInc":10000,"ident":"AD7B8CE8-2DA4-4FB4-907F-C551B926BA5C","localDate":"2021-08-02","pageId":"p13503","refId":"88737580570230824","visitorId":"275321532031467520"}
log.info("receive kafka msg: {}", message); // log.info("receive kafka msg: {}", message);
KafkaDurationUptMsg msg = JSON.parseObject(message, KafkaDurationUptMsg.class); KafkaDurationUptMsg msg = JSON.parseObject(message, KafkaDurationUptMsg.class);
// ident在每次进入新页面 & 回退 的时候都会随机生成一个,所以用ident做唯一key即可。 // ident在每次进入新页面 & 回退 的时候都会随机生成一个,所以用ident做唯一key即可。
......
...@@ -8,7 +8,6 @@ import com.tanpu.common.util.JsonUtil; ...@@ -8,7 +8,6 @@ import com.tanpu.common.util.JsonUtil;
import com.tanpu.common.uuid.UuidGenHelper; import com.tanpu.common.uuid.UuidGenHelper;
import com.tanpu.community.api.CommunityConstant; import com.tanpu.community.api.CommunityConstant;
import com.tanpu.community.api.enums.FileTypeEnum; import com.tanpu.community.api.enums.FileTypeEnum;
import com.tanpu.community.api.enums.OssRelType;
import com.tanpu.community.dao.entity.community.FileRecordEntity; import com.tanpu.community.dao.entity.community.FileRecordEntity;
import com.tanpu.community.dao.mapper.community.FileRecordMapper; import com.tanpu.community.dao.mapper.community.FileRecordMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -67,32 +66,34 @@ public class OSSFileService { ...@@ -67,32 +66,34 @@ public class OSSFileService {
public FileRecordEntity uploadFile(byte[] data, String fileName, String fileSuffix, String dirPrefix) { public FileRecordEntity uploadFile(byte[] data, String fileName, String fileSuffix, String dirPrefix) {
String[] arr = StringUtils.split(fileName, "."); String[] arr = StringUtils.split(fileName, ".");
String suffix = arr[arr.length - 1]; String suffix = arr[arr.length - 1];
if (FileTypeEnum.imageTypeSet.contains(suffix)) { //上传
//上传 String id = uuidGenHelper.getUuidStr();
String id = uuidGenHelper.getUuidStr(); String key = CommunityConstant.OSS_PREFIX_FOLDER + dirPrefix + id + "." + suffix;
String key = CommunityConstant.OSS_PREFIX_FOLDER + dirPrefix + id + "." + suffix; ossHelper.writeFile(bucketName, key, data, fileSuffix);
ossHelper.writeFile(bucketName, key, data, fileSuffix);
//落库
//落库 FileRecordEntity record = new FileRecordEntity();
FileRecordEntity record = new FileRecordEntity(); record.setFileId(uuidGenHelper.getUuidStr());
record.setFileId(uuidGenHelper.getUuidStr()); record.setDeleteTag(BizStatus.DeleteTag.tag_init);
record.setDeleteTag(BizStatus.DeleteTag.tag_init); record.setFileOssKey(key);
record.setFileOssKey(key); record.setFileName(fileName);
record.setFileName(fileName); record.setFileId(id);
record.setFileId(id); record.setPreviewUrl(ossHelper.getPreSignedUrl(bucketName, key));
record.setPreviewUrl(ossHelper.getPreSignedUrl(bucketName, key)); record.setFileType(FileTypeEnum.IMAGE.getCode());
record.setFileType(FileTypeEnum.IMAGE.getCode()); HashMap<String, Integer> attr = getStringIntegerHashMap(data);
HashMap<String, Integer> attr = getStringIntegerHashMap(data); record.setExtInfo(JsonUtil.toJson(attr));
record.setExtInfo(JsonUtil.toJson(attr));
fileRecordMapper.insert(record);
fileRecordMapper.insert(record);
return record;
return record; }
}else {
throw new BizException("文件格式暂不支持:"+suffix);
}
public void uploadFileNoRecord(byte[] data, String fileName, String fileSuffix, String dirPrefix) {
String[] arr = StringUtils.split(fileName, ".");
String suffix = arr[arr.length - 1];
//上传
String key = CommunityConstant.OSS_PREFIX_FOLDER + dirPrefix + suffix;
ossHelper.writeFile(bucketName, key, data, fileSuffix);
} }
private HashMap<String, Integer> getStringIntegerHashMap(byte[] data) { private HashMap<String, Integer> getStringIntegerHashMap(byte[] data) {
......
package com.tanpu.community.service; package com.tanpu.community.service;
import com.alibaba.fastjson.JSON;
import com.tanpu.common.util.JsonUtil; import com.tanpu.common.util.JsonUtil;
import com.tanpu.community.api.beans.qo.ThemeAnalysDO; import com.tanpu.community.api.beans.qo.ThemeAnalysDO;
import com.tanpu.community.api.beans.qo.TopicRankQo; import com.tanpu.community.api.beans.qo.TopicRankQo;
...@@ -7,17 +8,28 @@ import com.tanpu.community.api.enums.RankLogTypeEnum; ...@@ -7,17 +8,28 @@ import com.tanpu.community.api.enums.RankLogTypeEnum;
import com.tanpu.community.dao.entity.community.RankLogEntity; import com.tanpu.community.dao.entity.community.RankLogEntity;
import com.tanpu.community.dao.mapper.community.RankLogMapper; import com.tanpu.community.dao.mapper.community.RankLogMapper;
import com.tanpu.community.util.BizUtils; import com.tanpu.community.util.BizUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.ByteArrayOutputStream;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Service @Service
public class RankLogService { public class RankLogService {
@Autowired
private OSSFileService ossFileService;
@Resource @Resource
private RankLogMapper rankLogMapper; private RankLogMapper rankLogMapper;
...@@ -79,4 +91,40 @@ public class RankLogService { ...@@ -79,4 +91,40 @@ public class RankLogService {
} }
} }
// 定时清除ranklog,并上传到oss
public void clearRankLog() {
LocalDateTime t = LocalDateTime.now().minusDays(7L);
String d = t.format(DateTimeFormatter.BASIC_ISO_DATE);
log.info("start clearRankLog job before {}", d);
for (RankLogTypeEnum type : RankLogTypeEnum.values()) {
int idx = 0;
while (true) {
List<RankLogEntity> logs = rankLogMapper.selectByTypeLimit(type.getCode(), 100);
if (logs.isEmpty() || logs.get(0).getRankTime().isAfter(t)) {
break;
}
try {
String fileName = "ranklog_" + type.getCode() + "_" + idx;
ByteArrayOutputStream os = new ByteArrayOutputStream();
IOUtils.writeLines(logs.stream().map(JSON::toJSONString).collect(Collectors.toList()), null, os);
ossFileService.uploadFileNoRecord(os.toByteArray(), fileName, ".txt", "rankLog/");
Thread.sleep(1000);
} catch (Exception e) {
log.error("error in clearRankLog", e);
throw new RuntimeException(e);
}
// delete
List<Long> ids = logs.stream().map(RankLogEntity::getId).collect(Collectors.toList());
rankLogMapper.deleteBatchIds(ids);
idx++;
}
}
}
} }
...@@ -195,7 +195,6 @@ public class RankService { ...@@ -195,7 +195,6 @@ public class RankService {
//落库 //落库
rankLogService.logTopicRank(rankList, start, TimeUtils.calMillisTillNow(start)); rankLogService.logTopicRank(rankList, start, TimeUtils.calMillisTillNow(start));
return; return;
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment