加载中...
java多线程加速遍历解析
发表于:2022-02-14 | 分类: 多线程
字数统计: 2k | 阅读时长: 10分钟 | 阅读量:

原始需求

导出医生记录

//按照患者id分组,组内按照就诊时间升序

上面是耗时久的代码,每次导出一万五百条数据大概需要五六分钟才出结果,其中es里存的是json格式的数据,每次还需要解析。

初步优化

初步优化思路是:将json插入的时候就解析好存入es里每次查询导出数据的时候不用用的时候在解析。这边自动解析使用一个服务canal监听MySQL,将解析json这部分的代码拆出来放在这里。

初步优化这里大概还是需要两分钟才能出结果

发现主要耗时的操作就是for里的代码,每个用户都需要一个个去查7次es(因为有七个问卷,将每个问卷拆分成了7个es的索引),这边是单线程的,使用CompletableFuture优化成不并行的任务,这边由于操作没有返回值不需要join等待,(加了join速度不是很快)

经过测试速度提升了快一倍,但还在继续寻找优化方法

CountDownLatch优化

观察核心耗时代码发现,最主要的原因在for循环这,将Map<Long, List> patientIdGroupMap这个map写一个方法等分成list,用多线程去操作list去写数据。

这边自己写的时候有很多坑,自己一步步调试才避免,首先是多线程往list写的问题,需要一个计数器去将线程数减一,这个之前的想法是使用一个CyclicBarrier栅栏去等待所有线程结束在一起执行

在用 Thread().start() 测试上面的代码是没有问题的,但是改成用自定义的定长的线程池的时候发现有死锁的问题!!

因为自定义的是定长线程池,所以会卡在这

改用CountDownLatch,将主线程阻塞直到所有的解析任务都被执行完成。但是测试的时候最后总list数量一开始总是不对,之前以为是join方法没有阻塞就又换了种多线程的方法,但仔细观察了下方法发现是开启多个线程往list里添加的list的类型有问题。

用CompletableFuture优化

一开始没注意到这个问题,又多写了一种开启多线程的方法,也是用CompletableFuture

最后在测试服务器上的时间:

附代码:

  • CompletableFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251

public void export2(int exportType, Long livingId, String timeRange, HttpServletRequest request, HttpServletResponse response) {

// QueryParaWrapper<TDoctorRecord> tDoctorRecordQueryParams = new QueryParaWrapper<>();

// TDoctorRecord qdoctorRecord = new TDoctorRecord();

// qdoctorRecord.setHospitalId(livingId);

// tDoctorRecordQueryParams.setEntity(qdoctorRecord);

// Result<List<TDoctorRecord>> listProduceResultModel = tDoctorRecordInterface.queryTDoctorRecordList(tDoctorRecordQueryParams);



//改为从es获取医生记录

long start1 = System.currentTimeMillis();

List<ESDoctorRecord> tDoctorRecord = exportHelp.getTDoctorRecord(livingId);

if (CollectionUtils.isEmpty(tDoctorRecord)) {

ExportUtils.responseData(response);

return;

}

long start2 = System.currentTimeMillis();

System.out.println("查询1,耗时:" + (start2 - start1) + "ms");

// tDoctorRecord.stream().forEach(a->{

// TDoctorRecord tr = new TDoctorRecord();

// tr.

// });

// List<TDoctorRecord> tDoctorRecordList = listProduceResultModel.getData();

Result<List<UserAndPatientVo>> produceResultModel = tUserInterface.ExportPatient(livingId);

List<UserAndPatientVo> patientData = produceResultModel.getData();

if (CollectionUtils.isEmpty(tDoctorRecord) || CollectionUtils.isEmpty(patientData)) {

ExportUtils.responseData(response);

return;

}

tDoctorRecord = tDoctorRecord.stream().filter(a -> !Constant.TEST_HOSPITAL.contains(a.getHospitalId())).collect(Collectors.toList());

System.out.println(tDoctorRecord.size() + "----");

Map<Long, List<UserAndPatientVo>> patientMap = patientData.stream().collect(Collectors.groupingBy(UserAndPatientVo::getUserId));

//按照患者id分组,组内按照就诊时间升序

Map<Long, List<ESDoctorRecord>> patientIdGroupMap = tDoctorRecord.stream().collect(Collectors.groupingBy(ESDoctorRecord::getUserId));

//注意!!多线程写

List<DoctorRedExportVo> tDoctorRecords = Collections.synchronizedList(new ArrayList<>());

long start = System.currentTimeMillis();

System.out.println("查询2,耗时:" + (start - start1) + "ms");

// for (Map.Entry<Long, List<ESDoctorRecord>> entry : patientIdGroupMap.entrySet()) {

// List<Map<Long, List<ESDoctorRecord>>> maps = new LinkedList<>();

// maps.add(patientIdGroupMap);

AtomicInteger i = new AtomicInteger(0);

List<Map<Long, List<ESDoctorRecord>>> maps = CollUtils.mapChunk(patientIdGroupMap, patientIdGroupMap.size() / 15+1);

CompletableFuture[] cfs = maps.parallelStream().map(n -> CompletableFuture.runAsync(() -> cal(livingId, patientMap, n, tDoctorRecords),excelEventPool)

.whenComplete((v, e) -> {

System.out.println("任务" + i.incrementAndGet()+Thread.currentThread().getName());

})

).toArray(CompletableFuture[]::new);

CompletableFuture.allOf(cfs).join();



//

// AtomicInteger i = new AtomicInteger(1);

// List<CompletableFuture<Void>> completableFutureList = new LinkedList<>();

// completableFutureList=maps.parallelStream().map(n -> {

// return CompletableFuture.runAsync(() -> {

// cal(livingId, patientMap, n, tDoctorRecords);

// System.out.println(i.getAndIncrement());

// });

// }).collect(Collectors.toList());

// CompletableFuture<Void> headerFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[]{}));

// try {

// headerFuture.join();

// } catch (Exception ex) {

// System.out.println("done count: " + i.get());

// }

System.out.println(tDoctorRecords.size() + "--===--");

tDoctorRecords.sort(Comparator.comparing(DoctorRedExportVo::getUserId).thenComparing(DoctorRedExportVo::getVisitTime));

long end = System.currentTimeMillis();

System.out.println("方法,耗时:" + (end - start) + "ms");

export(request, response, tDoctorRecords);

long end2 = System.currentTimeMillis();

System.out.println("方法2,耗时:" + (end2 - end) + "ms");

System.out.println("总,耗时:" + (end2 - start1) + "ms");

}

private void cal(Long livingId, Map<Long, List<UserAndPatientVo>> patientMap, Map<Long, List<ESDoctorRecord>> patientIdGroupMap, List<DoctorRedExportVo> tDoctorRecords ) {

patientIdGroupMap.entrySet().stream().forEach((entry) -> {

List<ESDoctorRecord> valList = entry.getValue();

valList.sort(Comparator.comparing(ESDoctorRecord::getVisitTime));

int size = valList.size();

for (int i = 0; i < size; i++) {

ESDoctorRecord esDoctorRecord = valList.get(i);

List<UserAndPatientVo> patientList = patientMap.get(esDoctorRecord.getUserId());



if (CollectionUtils.isEmpty(patientList)) {

System.out.println(esDoctorRecord.getId() + "====");

continue;

}

DoctorRedExportVo doctorRedExportVo = getWholBasdaiQuestionByUserId(esDoctorRecord);

BeanUtils.copyProperties(patientList.get(0), doctorRedExportVo);

doctorRedExportVo.setMobile(patientList.get(0).getMobile());

doctorRedExportVo.setSymptomsAppearTime(HelpUtils.dateToyyyyMMdd(HelpUtils.yyyyMMddToDate(patientList.get(0).getSymptomsAppearTime())));

doctorRedExportVo.setSymptomsAppearTreatTime(HelpUtils.dateToyyyyMMdd(HelpUtils.yyyyMMddToDate(patientList.get(0).getSymptomsAppearTreatTime())));

doctorRedExportVo.setSymptomsDiagnosisTime(HelpUtils.dateToyyyyMMdd(HelpUtils.yyyyMMddToDate(patientList.get(0).getSymptomsDiagnosisTime())));

doctorRedExportVo.setJoinTime(HelpUtils.dateToyyyyMMdd(HelpUtils.yyyyMMddToDate(patientList.get(0).getJoinTime())));

Map<String, String> cacheMap = doctorRedCacheHelp.getFirstJoinHospitalCache(livingId);

doctorRedExportVo.setLivingName(cacheMap.get(esDoctorRecord.getUserId().toString()));

doctorRedExportVo.setVisitsNum(i + 1);

doctorRedExportVo.setVisitTime(HelpUtils.dateToyyyyMMdd(esDoctorRecord.getVisitTime()));

BeanUtils.copyProperties(esDoctorRecord, doctorRedExportVo);

doctorRedExportVo.setSex(ExportHelp.sex(doctorRedExportVo.getSex()));

CompletableFuture<Void> f1 = CompletableFuture

.runAsync(() -> {

getDoctRedInfo(esDoctorRecord, doctorRedExportVo);

});

CompletableFuture<Void> f2 = CompletableFuture

.runAsync(() -> {

BASFI_ASAS_HI(esDoctorRecord, doctorRedExportVo);

});

CompletableFuture<Void> f3 = CompletableFuture

.runAsync(() -> {

yearQuestion(esDoctorRecord, doctorRedExportVo);

});

CompletableFuture<Void> f4 = CompletableFuture

.runAsync(() -> {

asdas(esDoctorRecord, doctorRedExportVo);

});

// getDoctRedInfo(esDoctorRecord, doctorRedExportVo);

// BASFI_ASAS_HI(esDoctorRecord, doctorRedExportVo);

// yearQuestion(esDoctorRecord, doctorRedExportVo);

// asdas(esDoctorRecord, doctorRedExportVo);

// CompletableFuture.allOf(f1,f2,f3,f4).join();



tDoctorRecords.add(doctorRedExportVo);

// }

}

});

}

  • CountDownLatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195

public void export(int exportType, Long livingId, String timeRange, HttpServletRequest request, HttpServletResponse response) {

//改为从es获取医生记录

long start1 = System.currentTimeMillis();

List<ESDoctorRecord> tDoctorRecord = exportHelp.getTDoctorRecord(livingId);

if (CollectionUtils.isEmpty(tDoctorRecord)) {

ExportUtils.responseData(response);

return;

}

long start2 = System.currentTimeMillis();

System.out.println("查询1,耗时:" + (start2 - start1) + "ms");

Result<List<UserAndPatientVo>> produceResultModel = tUserInterface.ExportPatient(livingId);

List<UserAndPatientVo> patientData = produceResultModel.getData();

if (CollectionUtils.isEmpty(tDoctorRecord) || CollectionUtils.isEmpty(patientData)) {

ExportUtils.responseData(response);

return;

}

tDoctorRecord = tDoctorRecord.stream().filter(a -> !Constant.TEST_HOSPITAL.contains(a.getHospitalId())).collect(Collectors.toList());

System.out.println(tDoctorRecord.size() + "----");

Map<Long, List<UserAndPatientVo>> patientMap = patientData.stream().collect(Collectors.groupingBy(UserAndPatientVo::getUserId));

//按照患者id分组,组内按照就诊时间升序

Map<Long, List<ESDoctorRecord>> patientIdGroupMap = tDoctorRecord.stream().collect(Collectors.groupingBy(ESDoctorRecord::getUserId));

//注意!!多线程写

List<DoctorRedExportVo> tDoctorRecords = Collections.synchronizedList(new ArrayList<>());

long start = System.currentTimeMillis();

System.out.println("查询2,耗时:" + (start - start1) + "ms");

// for (Map.Entry<Long, List<ESDoctorRecord>> entry : patientIdGroupMap.entrySet()) {

List<Map<Long, List<ESDoctorRecord>>> maps = CollUtils.mapChunk(patientIdGroupMap, patientIdGroupMap.size() / 15+1);

CountDownLatch countDownLatch = new CountDownLatch(maps.size() );

for (Map<Long, List<ESDoctorRecord>> m : maps) {

excelEventPool.submit(new Runner(livingId, patientMap, m, tDoctorRecords, countDownLatch));

}

try {

countDownLatch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}



//用定长线程池会死锁!!

// CyclicBarrier cyclicBarrier = new CyclicBarrier(maps.size() + 1);



// for (Map<Long, List<ESDoctorRecord>> m : maps) {

// excelEventPool.submit(new Runner(livingId, patientMap, m, tDoctorRecords, cyclicBarrier));

// new Thread(new Runner(livingId, patientMap, m, tDoctorRecords, cyclicBarrier)).start();

// }

// maps.parallelStream().map(n->{

// new Thread(new Runner(livingId, patientMap, n, tDoctorRecords, cyclicBarrier)).start();

// return null;

// });



// try {

// cyclicBarrier.await();

// } catch (InterruptedException e) {

// e.printStackTrace();

// } catch (BrokenBarrierException e) {

// e.printStackTrace();

// }

System.out.println("-----------\n所有thread执行完成!");

// cal(livingId, patientMap, patientIdGroupMap, tDoctorRecords);

System.out.println(tDoctorRecords.size() + "--===--");

tDoctorRecords.sort(Comparator.comparing(DoctorRedExportVo::getUserId).thenComparing(DoctorRedExportVo::getVisitTime));

long end = System.currentTimeMillis();

System.out.println("方法,耗时:" + (end - start) + "ms");

export(request, response, tDoctorRecords);

long end2 = System.currentTimeMillis();

System.out.println("方法2,耗时:" + (end2 - end) + "ms");

System.out.println("总,耗时:" + (end2 - start1) + "ms");

}



class Runner implements Runnable {

private CountDownLatch countDownLatch;

private Long livingId;

private Map<Long, List<UserAndPatientVo>> patientMap;

private Map<Long, List<ESDoctorRecord>> patientIdGroupMap;

private List<DoctorRedExportVo> tDoctorRecords;



public Runner(Long livingId, Map<Long, List<UserAndPatientVo>> patientMap, Map<Long, List<ESDoctorRecord>> patientIdGroupMap, List<DoctorRedExportVo> tDoctorRecords, CountDownLatch countDownLatch) {

this.countDownLatch = countDownLatch;

this.livingId = livingId;

this.patientMap = patientMap;

this.patientIdGroupMap = patientIdGroupMap;

this.tDoctorRecords = tDoctorRecords;

}



@Override

public void run() {

try {

cal(livingId, patientMap, patientIdGroupMap, tDoctorRecords);

// countDownLatch.await();

// System.out.println(Thread.currentThread().getName() + "结束");

} finally {

countDownLatch.countDown();

System.out.println(Thread.currentThread().getName() + "结束");

}

}





}

上一篇:
Elasticsearch中Script用法代替sql中a小于b条件
下一篇:
02redis分布式锁分析与cluster模式搭建
本文目录
本文目录