多线程回调

编程入门 行业动态 更新时间:2024-10-13 02:15:45

多线程<a href=https://www.elefans.com/category/jswz/34/1771356.html style=回调"/>

多线程回调

第一种:

package com.example.demo.test;import org.apache.http.concurrent.FutureCallback;import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class CallBackTest {public static List<String> mai1(List<String> stringList) throws InterruptedException {//利用计数器阻塞主线程,在全部回调结束后再close(源码是继承了通道同步锁)final CountDownLatch countDownLatch = new CountDownLatch(100000);//线程池(事例中自定义了时间限制)ExecutorService executorService = Executors.newFixedThreadPool(5);FutureCallback<String> futureCallback = new FutureCallback<String>() {@Overridepublic void completed(String s) {stringList.add(s);countDownLatch.countDown();}@Overridepublic void failed(Exception e) {countDownLatch.countDown();}@Overridepublic void cancelled() {countDownLatch.countDown();}};for (int i = 0; i < 100000; i++) {final Integer i1 = i;executorService.execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000);futureCallbackpleted(String.valueOf(i1));} catch (InterruptedException e) {e.printStackTrace();}}});}//阻塞主线程,设置超时时间2分钟,避免因子线程处理异常而长时间一直等待countDownLatch.await(2, TimeUnit.MINUTES);//关闭executorService.shutdown();return stringList;}public static void main(String[] args) throws InterruptedException {long l = System.currentTimeMillis();List<String> stringList = new LinkedList<>();List<String> stringList1 = mai1(stringList);System.out.println(stringList1);long l1 = System.currentTimeMillis() - l;System.out.println(l1);}}

第二种:

package com.example.demo.test;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;public class CallableThread implements Callable<Map<String,Integer>> {//传递进来的参数int data;private Map <String,Integer> map =new HashMap <String,Integer>();//定义需要返回的mapprivate final CountDownLatch latch ;public CallableThread( int data, CountDownLatch latch) {this.data = data;this.latch = latch;}public Map<String,Integer> call() throws Exception {try{//每个参数手动增加100data+=100;map.put("data", data);System.out.println("线程:"+Thread.currentThread().getName());}catch(Exception e){e.printStackTrace();}finally{latch.countDown();}return map;}
}
package com.example.demo.test;import org.jetbrains.annotations.NotNull;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;public class ThreadTest {public static void main(String[] args) {final CountDownLatch latch = new CountDownLatch(5);ExecutorService pool=Executors.newFixedThreadPool(5);List list=new ArrayList();try {for(int j=0;j<5;j++){Callable<Map<String, Integer>> c1 = new CallableThread(5,latch);Future<Map<String,Integer>> f1=pool.submit(c1);list.add(f1);}latch.await();for(int i=0;i<list.size();i++){Future<Map<String,Integer>> f1=(Future<Map<String, Integer>>) list.get(i);System.out.println((Integer)f1.get().get("data"));}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

公司异步调用的例子:

package com.bsj.util.map;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bsj.util.DataUntil;
import com.bsj.util.ExceptionUtil;
import com.bsj.util.GeoUntil;
import org.apachemons.lang.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.entity.DeflateDecompressingEntity;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** 地理位置工具类** @author 杨小华* @date 2018/5/3 16:54* @since v3*/
@Deprecated
public class OldMapUtil {private static Logger logger = Logger.getLogger(OldMapUtil.class);/*** 公有的异步获取地址** @param geoList* @return java.util.List<com.bsj.util.map.MapDto>* @author 杨小华* @date 2018/5/3 16:54* @since v3*/public static List<MapDto> handleCommonMap(List<MapDto> geoList) throws InterruptedException {// 提前校验if(geoList == null || geoList.isEmpty()) {return new LinkedList<>();}//1.纠偏并排序int geoSize = geoList.size();MapDto mapDto = null;for (int i = 0; i < geoSize; i++) {mapDto = geoList.get(i);// 纠偏double[] latLon = GeoUntil.transform(mapDto.getLat(), mapDto.getLon());mapDto.setLat(latLon[0]);mapDto.setLon(latLon[1]);}//2.异步去获取地址long t1 = System.currentTimeMillis();JSONArray array = getAsyncAddress(geoList);//重新排序GeoUntil.geoListSort(array);System.out.println("传入数量:" + geoSize + ",输出数量" + array.size() + "耗时:" + (System.currentTimeMillis() - t1));// 3.对数据进行处理return arrayToMapDtoList(array);}/*** 返回数据处理** @param array* @return java.util.List<com.bsj.util.map.MapDto>* @author 杨小华* @date 2018/5/4 11:09* @since v3*/private static List<MapDto> arrayToMapDtoList(JSONArray array) {List<MapDto> geoAddressList = new LinkedList<>();MapDto mapDto = null;for (int i = 0; i < array.size(); i++) {mapDto = new MapDto();JSONObject object = (JSONObject) array.get(i);JSONObject code = (JSONObject) object.get("regeocode");Object tagObj = object.get("tag");if(tagObj != null){int tag = Integer.valueOf(tagObj.toString());mapDto.setTag(tag);}StringBuilder str = new StringBuilder();if (code != null) {mapDto.setAddress(code.get("formatted_address").toString());JSONArray roads = (JSONArray)code.get("roads");if(roads != null && !roads.isEmpty()) {for (int j = 0; j < roads.size(); j++) {if (j == 0) {str.append("(");} else {str.append(" ");}JSONObject roadObj = (JSONObject) roads.get(j);str.append("在").append(roadObj.getString("name")).append("的").append(roadObj.getString("direction")).append("方向约").append((int) Math.floor(roadObj.getDouble("distance"))).append("米");}str.append(")");mapDto.setRoads(str.toString());}}geoAddressList.add(mapDto);}return geoAddressList;}/*** 异步获取地址** @param geoList* @return com.alibaba.fastjson.JSONArray* @author 杨小华* @date 2018/5/4 10:24* @since v3*/private static JSONArray getAsyncAddress(List<MapDto> geoList) throws InterruptedException {JSONArray allList = new JSONArray();List<List<MapDto>> splitData = DataUntil.splitData(geoList, 100);//利用计数器阻塞主线程,在全部回调结束后再closefinal CountDownLatch countDownLatch = new CountDownLatch(splitData.size());//回调FutureCallback<HttpResponse> callback = new FutureCallback<HttpResponse>() {@Overridepublic void completed(final HttpResponse response) {
//                System.out.println("成功线程 callback thread id is : " + Thread.currentThread().getId());try {HttpEntity httpEntity = response.getEntity();// gzip 另外处理,否则乱码if (httpEntity.getContentEncoding() != null) {if ("gzip".equalsIgnoreCase(httpEntity.getContentEncoding().getValue())) {httpEntity = new GzipDecompressingEntity(httpEntity);} else if ("deflate".equalsIgnoreCase(httpEntity.getContentEncoding().getValue())) {httpEntity = new DeflateDecompressingEntity(httpEntity);}}// 取出应答字符串String resultStr = EntityUtils.toString(httpEntity, "UTF-8");//判断值返回是否正确if (StringUtils.isNotEmpty(resultStr)) {Object o = GeoUntil.handleGeo(resultStr);if (o != null) {JSONArray list = (JSONArray) o;allList.addAll(list);}else{logger.error("@@返回解析为空的值:" + resultStr);}}else{logger.error("@@返回空的值:" + resultStr);}} catch (Exception e) {logger.error("MapUtil异步查询失败:" + ExceptionUtil.getStackStr(e));}//计数器减,执行完再减,否则会出现数据少100条的情况countDownLatch.countDown();}@Overridepublic void failed(final Exception ex) {//计数器减countDownLatch.countDown();System.out.println("失败线程callback thread id is : " + Thread.currentThread().getId() + ",error:" + ex);}@Overridepublic void cancelled() {//计数器减countDownLatch.countDown();System.out.println("取消线程callback thread id is : " + Thread.currentThread().getId());}};HttpAsyncUtil util = new HttpAsyncUtil();String url = "http://120.76.69.92:8080/geo/GetGeo.json?";
//        String url = PropertiesUtil.getConfig("servicesGeoUrl");ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);for (List<MapDto> mapDtoList : splitData) {// 使用线程池,1w条 提升5秒左右fixedThreadPool.execute(new Runnable() {@Overridepublic void run() {// 异步发送Map<String, Object> param = new HashMap<>();param.put("posList", mapDtoList);String paramStr = "param=" + JSON.toJSONString(param);paramStr = paramStr + "&jsoncallback=";final HttpPost httpPost = new HttpPost(url);StringEntity stringEntity = new StringEntity(paramStr, "utf-8");httpPost.setEntity(stringEntity);util.sendHttpPost(httpPost, callback);}});}//阻塞主线程,设置超时时间2分钟,避免因子线程处理异常而长时间一直等待countDownLatch.await(2, TimeUnit.MINUTES);//关闭fixedThreadPool.shutdown();util.close();return allList;}public static void main(String[] args) throws InterruptedException {List<MapDto> geoList = new LinkedList<>();for (int i = 0; i < 10000; i++) {MapDto mapDto = new MapDto();mapDto.setLat(23.32004);mapDto.setLon(114.01560);mapDto.setTag(1+i);geoList.add(mapDto);}List<MapDto> mapDtos = handleCommonMap(geoList);int index = 0;for(MapDto dto : mapDtos){String address = dto.getAddress();if(!"广东省惠州市博罗县罗浮山管委会544乡道".equals(address)){System.out.println(address);index ++;}}System.err.println("错误的:" + index);System.out.println(mapDtos == null ? 0 : mapDtos.size());}}

http调用:

package com.bsj.util.map;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bsj.util.DataUntil;
import com.bsj.util.ExceptionUtil;
import com.bsj.util.GeoUntil;
import org.apachemons.lang.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.entity.DeflateDecompressingEntity;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** 地理位置工具类** @author 杨小华* @date 2018/5/3 16:54* @since v3*/
@Deprecated
public class OldMapUtil {private static Logger logger = Logger.getLogger(OldMapUtil.class);/*** 公有的异步获取地址** @param geoList* @return java.util.List<com.bsj.util.map.MapDto>* @author 杨小华* @date 2018/5/3 16:54* @since v3*/public static List<MapDto> handleCommonMap(List<MapDto> geoList) throws InterruptedException {// 提前校验if(geoList == null || geoList.isEmpty()) {return new LinkedList<>();}//1.纠偏并排序int geoSize = geoList.size();MapDto mapDto = null;for (int i = 0; i < geoSize; i++) {mapDto = geoList.get(i);// 纠偏double[] latLon = GeoUntil.transform(mapDto.getLat(), mapDto.getLon());mapDto.setLat(latLon[0]);mapDto.setLon(latLon[1]);}//2.异步去获取地址long t1 = System.currentTimeMillis();JSONArray array = getAsyncAddress(geoList);//重新排序GeoUntil.geoListSort(array);System.out.println("传入数量:" + geoSize + ",输出数量" + array.size() + "耗时:" + (System.currentTimeMillis() - t1));// 3.对数据进行处理return arrayToMapDtoList(array);}/*** 返回数据处理** @param array* @return java.util.List<com.bsj.util.map.MapDto>* @author 杨小华* @date 2018/5/4 11:09* @since v3*/private static List<MapDto> arrayToMapDtoList(JSONArray array) {List<MapDto> geoAddressList = new LinkedList<>();MapDto mapDto = null;for (int i = 0; i < array.size(); i++) {mapDto = new MapDto();JSONObject object = (JSONObject) array.get(i);JSONObject code = (JSONObject) object.get("regeocode");Object tagObj = object.get("tag");if(tagObj != null){int tag = Integer.valueOf(tagObj.toString());mapDto.setTag(tag);}StringBuilder str = new StringBuilder();if (code != null) {mapDto.setAddress(code.get("formatted_address").toString());JSONArray roads = (JSONArray)code.get("roads");if(roads != null && !roads.isEmpty()) {for (int j = 0; j < roads.size(); j++) {if (j == 0) {str.append("(");} else {str.append(" ");}JSONObject roadObj = (JSONObject) roads.get(j);str.append("在").append(roadObj.getString("name")).append("的").append(roadObj.getString("direction")).append("方向约").append((int) Math.floor(roadObj.getDouble("distance"))).append("米");}str.append(")");mapDto.setRoads(str.toString());}}geoAddressList.add(mapDto);}return geoAddressList;}/*** 异步获取地址** @param geoList* @return com.alibaba.fastjson.JSONArray* @author 杨小华* @date 2018/5/4 10:24* @since v3*/private static JSONArray getAsyncAddress(List<MapDto> geoList) throws InterruptedException {JSONArray allList = new JSONArray();List<List<MapDto>> splitData = DataUntil.splitData(geoList, 100);//利用计数器阻塞主线程,在全部回调结束后再closefinal CountDownLatch countDownLatch = new CountDownLatch(splitData.size());//回调FutureCallback<HttpResponse> callback = new FutureCallback<HttpResponse>() {@Overridepublic void completed(final HttpResponse response) {
//                System.out.println("成功线程 callback thread id is : " + Thread.currentThread().getId());try {HttpEntity httpEntity = response.getEntity();// gzip 另外处理,否则乱码if (httpEntity.getContentEncoding() != null) {if ("gzip".equalsIgnoreCase(httpEntity.getContentEncoding().getValue())) {httpEntity = new GzipDecompressingEntity(httpEntity);} else if ("deflate".equalsIgnoreCase(httpEntity.getContentEncoding().getValue())) {httpEntity = new DeflateDecompressingEntity(httpEntity);}}// 取出应答字符串String resultStr = EntityUtils.toString(httpEntity, "UTF-8");//判断值返回是否正确if (StringUtils.isNotEmpty(resultStr)) {Object o = GeoUntil.handleGeo(resultStr);if (o != null) {JSONArray list = (JSONArray) o;allList.addAll(list);}else{logger.error("@@返回解析为空的值:" + resultStr);}}else{logger.error("@@返回空的值:" + resultStr);}} catch (Exception e) {logger.error("MapUtil异步查询失败:" + ExceptionUtil.getStackStr(e));}//计数器减,执行完再减,否则会出现数据少100条的情况countDownLatch.countDown();}@Overridepublic void failed(final Exception ex) {//计数器减countDownLatch.countDown();System.out.println("失败线程callback thread id is : " + Thread.currentThread().getId() + ",error:" + ex);}@Overridepublic void cancelled() {//计数器减countDownLatch.countDown();System.out.println("取消线程callback thread id is : " + Thread.currentThread().getId());}};HttpAsyncUtil util = new HttpAsyncUtil();String url = "http://120.76.69.92:8080/geo/GetGeo.json?";
//        String url = PropertiesUtil.getConfig("servicesGeoUrl");ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);for (List<MapDto> mapDtoList : splitData) {// 使用线程池,1w条 提升5秒左右fixedThreadPool.execute(new Runnable() {@Overridepublic void run() {// 异步发送Map<String, Object> param = new HashMap<>();param.put("posList", mapDtoList);String paramStr = "param=" + JSON.toJSONString(param);paramStr = paramStr + "&jsoncallback=";final HttpPost httpPost = new HttpPost(url);StringEntity stringEntity = new StringEntity(paramStr, "utf-8");httpPost.setEntity(stringEntity);util.sendHttpPost(httpPost, callback);}});}//阻塞主线程,设置超时时间2分钟,避免因子线程处理异常而长时间一直等待countDownLatch.await(2, TimeUnit.MINUTES);//关闭fixedThreadPool.shutdown();util.close();return allList;}public static void main(String[] args) throws InterruptedException {List<MapDto> geoList = new LinkedList<>();for (int i = 0; i < 10000; i++) {MapDto mapDto = new MapDto();mapDto.setLat(23.32004);mapDto.setLon(114.01560);mapDto.setTag(1+i);geoList.add(mapDto);}List<MapDto> mapDtos = handleCommonMap(geoList);int index = 0;for(MapDto dto : mapDtos){String address = dto.getAddress();if(!"广东省惠州市博罗县罗浮山管委会544乡道".equals(address)){System.out.println(address);index ++;}}System.err.println("错误的:" + index);System.out.println(mapDtos == null ? 0 : mapDtos.size());}}

更多推荐

多线程回调

本文发布于:2024-03-05 23:50:26,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1713785.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:回调   多线程

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!