星尘使用redis缓存

This commit is contained in:
wulin 2023-11-21 01:45:53 +08:00
parent 7374bdfb14
commit 037a57ea28
2 changed files with 74 additions and 31 deletions

View File

@ -6,17 +6,24 @@ import com.alibaba.dashscope.aigc.generation.models.QwenParam;
import com.alibaba.dashscope.common.MessageManager; import com.alibaba.dashscope.common.MessageManager;
import com.alibaba.dashscope.common.ResultCallback; import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.common.Role; import com.alibaba.dashscope.common.Role;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.xingchen.ApiClient; import com.alibaba.xingchen.ApiClient;
import com.alibaba.xingchen.api.ChatApiSub; import com.alibaba.xingchen.api.ChatApiSub;
import com.alibaba.xingchen.auth.HttpBearerAuth; import com.alibaba.xingchen.auth.HttpBearerAuth;
import com.alibaba.xingchen.model.*; import com.alibaba.xingchen.model.*;
import com.qiuguo.iot.base.constans.RedisConstans;
import com.qiuguo.iot.base.utils.StringUtils;
import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse; import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse;
import com.qiuguo.iot.third.service.IQianWen; import com.qiuguo.iot.third.service.IQianWen;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
@ -39,6 +46,7 @@ public class AliYunXingChen {
ChatReqParams chatReqParams; ChatReqParams chatReqParams;
String userName; String userName;
String userId;
public AliYunXingChen(String basePath, String key, String roleId, String userId, String userName) { public AliYunXingChen(String basePath, String key, String roleId, String userId, String userName) {
api = new ChatApiSub(); api = new ChatApiSub();
ApiClient apiClient = new ApiClient(); ApiClient apiClient = new ApiClient();
@ -51,6 +59,7 @@ public class AliYunXingChen {
authorization.setBearerToken(key); authorization.setBearerToken(key);
api.setApiClient(apiClient); api.setApiClient(apiClient);
this.userName = userName; this.userName = userName;
this.userId = userId;
chatReqParams = ChatReqParams.builder() chatReqParams = ChatReqParams.builder()
.botProfile( .botProfile(
CharacterKey.builder() CharacterKey.builder()
@ -74,39 +83,70 @@ public class AliYunXingChen {
} }
public Mono<Boolean> sendMessage(String msg, public Mono<Boolean> sendMessage(String msg,
IQianWen<? super String> onNext, IQianWen<? super String> onNext,
QWenReplyResponse qwenReplyResponse){ QWenReplyResponse qwenReplyResponse,
ReactiveStringRedisTemplate reactiveStringRedisTemplate){
log.info("调用通义星尘回答:{}", msg); log.info("调用通义星尘回答:{}", msg);
Message message = Message.builder().name(userName).role("user").content(msg).build(); return reactiveStringRedisTemplate.opsForValue().get(RedisConstans.TONGYI_TALK_CONTENT + userId)
addMessage(message); .defaultIfEmpty("")
chatReqParams.setMessages(messages); .map(s -> {
qwenReplyResponse.setCode(200); try{
return Mono.defer(() -> { if(StringUtils.isNotEmpty(s)){
try { log.info("缓存不为空");
Flowable<ChatResult> response = api.streamOut(chatReqParams); JSONArray jsonArray = JSON.parseArray(s);
RecordMessage recordMessage = new RecordMessage(); messages.clear();
response.blockingForEach(m -> { int i = 0;
String v = m.getChoices().get(0).getMessages().get(0).getContent().replaceAll(recordMessage.getMsg(), ""); if(jsonArray.size() > 49){
onNext.sendMessage(v); i = 1;
recordMessage.setMsg(m.getChoices().get(0).getMessages().get(0).getContent()); }
if("stop".equals(m.getChoices().get(0).getStopReason())){ for(; i < jsonArray.size(); ++i){
// Message m = JSONObject.toJavaObject(jsonArray.getJSONObject(i), Message.class);
messages.add(m);
Message message1 = Message.builder() }
.role(m.getChoices().get(0).getMessages().get(0).getRole()) }
.content(m.getChoices().get(0).getMessages().get(0).getContent()) }catch (Exception e){
.build(); log.info("聊天缓存转换异常{}", e);
qwenReplyResponse.setResut(message1.getContent()); }
addMessage(message1); finally {
onNext.finish(); Message message = Message.builder().name(userName).role("user").content(msg).build();
messages.add(message);
return messages;
} }
});
}catch (Exception e){
log.info("调用星尘异常{}", e);
qwenReplyResponse.setCode(500);
}
return Mono.just(true);
});
}).flatMap(msgs ->{
chatReqParams.setMessages(messages);
qwenReplyResponse.setCode(200);
try {
Flowable<ChatResult> response = api.streamOut(chatReqParams);
RecordMessage recordMessage = new RecordMessage();
response.blockingForEach(m -> {
String v = m.getChoices().get(0).getMessages().get(0).getContent().replaceAll(recordMessage.getMsg(), "");
onNext.sendMessage(v);
recordMessage.setMsg(m.getChoices().get(0).getMessages().get(0).getContent());
if("stop".equals(m.getChoices().get(0).getStopReason())){
//
Message message1 = Message.builder()
.role(m.getChoices().get(0).getMessages().get(0).getRole())
.content(m.getChoices().get(0).getMessages().get(0).getContent())
.build();
qwenReplyResponse.setResut(message1.getContent());
addMessage(message1);
onNext.finish();
reactiveStringRedisTemplate.opsForValue().set(RedisConstans.TONGYI_TALK_CONTENT + userId, JSONObject.toJSONString(messages), Duration.ofDays(1L))
.map(b -> {
log.info("保存聊天缓存状态{}", b);
return b;
}).subscribe();
}
});
return Mono.just(true);
}catch (Exception e){
log.info("调用星尘异常{}", e);
qwenReplyResponse.setCode(500);
}
return Mono.just(false);
});
} }
private void addMessage(Message msg){ private void addMessage(Message msg){

View File

@ -32,6 +32,9 @@ import java.util.stream.Collectors;
@Service @Service
public class TongYiXinChenService implements ITongYi{ public class TongYiXinChenService implements ITongYi{
@Resource
ReactiveStringRedisTemplate reactiveStringRedisTemplate;
protected static ConcurrentHashMap<String, AliYunXingChen> qianwenGroup = new ConcurrentHashMap<>(); protected static ConcurrentHashMap<String, AliYunXingChen> qianwenGroup = new ConcurrentHashMap<>();
@Override @Override
@ -47,7 +50,7 @@ public class TongYiXinChenService implements ITongYi{
} }
QWenReplyResponse qWenReplyResponse = new QWenReplyResponse(); QWenReplyResponse qWenReplyResponse = new QWenReplyResponse();
return aliXingChen.sendMessage(rest.getText(), onNext, qWenReplyResponse).flatMap(b -> { return aliXingChen.sendMessage(rest.getText(), onNext, qWenReplyResponse, reactiveStringRedisTemplate).flatMap(b -> {
return Mono.just(qWenReplyResponse); return Mono.just(qWenReplyResponse);
}); });