package org.las2mile.okio; import org.las2mile.okio.message.BaseMsg; import org.las2mile.okio.message.req.HartBeatMsg; import org.las2mile.okio.message.req.LogFileMsgReq; import org.las2mile.okio.message.resp.HartBeatMsgResp; import org.las2mile.okio.server.LogZipFileProcedure; import org.las2mile.okio.message.req.LogFileLostMsg; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import okio.Buffer; import okio.BufferedSink; import okio.BufferedSource; import okio.Okio; import org.las2mile.scrcpy.BuildConfig; public class OkIOServer extends Thread{ private ExecutorService executor; private ServerSocket serverSocket; public OkIOServer(){ this.executor = Executors.newCachedThreadPool(); } @Override public void run() { super.run(); try { serverSocket = new ServerSocket(4567); } catch (IOException e) { Ln.e("Socket error when open:" + e.getMessage()); return; } Ln.i("Server Socket open success: " + serverSocket.getInetAddress()); while (true) { final Socket socket; try { socket = serverSocket.accept(); Ln.i("Client Socket accept success from: "/* + socket.getInetAddress()*/); final BufferedSource source = Okio.buffer(Okio.source(socket)); final BufferedSink sink = Okio.buffer(Okio.sink(socket)); process(socket, source, sink); } catch (IOException e) { Ln.e("Socket error in serverSocket accept(): " + e.getMessage()); if (serverSocket.isClosed()) { break; } } } } private void process(Socket socket, BufferedSource source, BufferedSink sink) { Ln.e("Server handle socket "); executor.execute(new Runnable() { @Override public void run() { try { HartBeatMsgResp version = new HartBeatMsgResp(); version.setRemoteInfo(BuildConfig.BUILD_TIME);//(BuildConfig.BUILD_TIME); sendMsg(version,sink); Map logZipFileProcedureMap = new HashMap<>(); Buffer buffer = new Buffer(); OkIoPacketDecode okIoPacketDecode = new OkIoPacketDecode("server "); for (long byteCount; (byteCount = source.read(buffer, 8192L)) != -1; ) { List msgList = okIoPacketDecode.parsePacketC(buffer, (int) byteCount); if(msgList != null){ for (OkIoPacket packet : msgList) { Ln.e("server process cmd "+packet.cmd); if(packet.cmd == BaseMsg.REQ_REQUEST_LOGFILE){ LogFileMsgReq reqLogFileMsg = new LogFileMsgReq(); reqLogFileMsg.decodeData(packet.mBaseMsgBuffer, (short) packet.mBaseMsgBuffer.size()); //Ln.e("server process cmd "+reqLogFileMsg.toString()); LogZipFileProcedure logZipFileProcedure = new LogZipFileProcedure(sink,reqLogFileMsg.procedureType); //logZipFileProcedure.taskId = reqLogFileMsg.taskId; logZipFileProcedure.createTask(reqLogFileMsg.taskId); logZipFileProcedureMap. put(logZipFileProcedure.taskId,logZipFileProcedure); logZipFileProcedure.uploadImageAllPartitions(); } else if(packet.cmd == BaseMsg.REQ_HARTBEAT){ HartBeatMsg hartBeatMsg = new HartBeatMsg(); hartBeatMsg.decodeData(packet.mBaseMsgBuffer, (short) packet.mBaseMsgBuffer.size()); Ln.e("server process heartbeat "+hartBeatMsg.toString()); HartBeatMsgResp hartBeatMsgResp = new HartBeatMsgResp(); sendMsg(hartBeatMsgResp,sink); } else if (packet.cmd == BaseMsg.REQ_REQUEST_LOGFILE_LOST){ LogFileLostMsg logFileLostMsg = new LogFileLostMsg(); logFileLostMsg.decodeData(packet.mBaseMsgBuffer, (short) packet.mBaseMsgBuffer.size()); if(logFileLostMsg.lostNums > 0) { LogZipFileProcedure logZipFileProcedure = logZipFileProcedureMap.get(logFileLostMsg.taskId); logZipFileProcedure.uploadImageLeftPartitions(logFileLostMsg.lostNos); } else { logZipFileProcedureMap.remove(logFileLostMsg.taskId); } } } } // Ln.e("server process cmd "+packet.cmd); // if(packet.cmd == BaseMsg.REQ_HARTBEAT){ // HartBeatMsg hartBeatMsg = new HartBeatMsg(); // hartBeatMsg.decodeData(msgBody, (short) msgBody.size()); // Ln.e("server process read "+hartBeatMsg.toString()); // // HartBeatMsgResp hartBeatMsgResp = new HartBeatMsgResp(); // sendMsg(hartBeatMsgResp,sink); // } else if(packet.cmd == BaseMsg.REQ_REQUEST_LOGFILE){ // LogFileMsgReq reqLogFileMsg = new LogFileMsgReq(); // reqLogFileMsg.decodeData(msgBody, (short) msgBody.size()); // Ln.e("server process read "+reqLogFileMsg.toString()); // // LogZipFileProcedure logZipFileProcedure = new LogZipFileProcedure(sink); // logZipFileProcedure.uploadImageAllPartitions(); // } } } catch (Exception e){ Ln.e("server process failure:"+e.getMessage()); System.exit(0); } } }); } private void sendMsg(BaseMsg baseMsg,BufferedSink sink) throws Exception { OkIoPacket respPkt = new OkIoPacket(); byte dataResp[] = respPkt.assemblePacket(baseMsg); Ln.i(dataResp.length+" server send "/*+OkIoPacketDecode.toHexString(dataResp)*/); sink.write(dataResp); sink.flush(); } } /* //解析请求 String requestLine = source.readUtf8LineStrict();读取 Buffer buffer = new Buffer(); //buffer.writeAll(source); buffer.write(source, bodySize);将剩余的写入新的buffer, sink.writeUtf8(status.toString()); sink.flush(); response.getBody= Buffer buffer = new Buffer().writeUtf8(body); response.getBody().readAll(sink); sink.flush();//将剩余的发给客户端 //断开连接 try { source.close(); sink.close(); } catch (IOException e) { ZLog.e("error in socket and stream close: " + e.getMessage() + socket.isClosed()); } catch (Throwable e) { ZLog.e(e.getMessage()); } finally { try { socket.close(); } catch (IOException e) { ZLog.e(e.getMessage()); } } */