文章归档

置顶文章

Web安全

Web安全基础

PHP相关

Writeups

靶机系列

HackTheBox-Retired

HackTheBox-Active

VulnHub

代码审计

PHP代码审计

大数据安全

机器学习

基础学习

Python

Python基础

Python安全

Java

Java基础

Java安全

算法

Leetcode

随笔

经验

技术

 2021-03-11   2.8k

Java基础学习之RPC演进过程

参考:

https://www.bilibili.com/video/BV1zE41147Zq?from=search&seid=13740626242455157002

首先定义一个User类,只有两个字段:idname,并且继承序列化Serilizable接口。

User.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.ca01h.rpc.common;

import java.io.Serializable;

public class User implements Serializable {
private Integer id;
private String name;

public User(Integer id, String name){
this.id = id;
this.name = name;
}

public Integer getId() {
return id;
}

public String getName() {
return name;
}

public void setId(Integer id) {
this.id = id;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}

接着定义一个IUserService接口类,其中只有一个方法findUserById,通过id字段返回一个User对象。

IUserService.java

1
2
3
4
5
package com.ca01h.rpc.common;

public interface IUserService {
public User findUserById(Integer id);
}

第一个版本

首先来看第一个实现RPC的最原始版本。在上面common包中声明了IUserService接口,这里实现这个接口,简单起见,name字段为一个固定值,id字段通过传参传进来。

UserServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
package com.ca01h.rpc.rpc01;

import com.ca01h.rpc.common.IUserService;
import com.ca01h.rpc.common.User;

public class UserServiceImpl implements IUserService {

@Override
public User findUserById(Integer id) {
return new User(id, "ca01h");
}
}

接着实现服务端代码,在这个例子中,服务端代码的核心就是接收客户端传过来的id参数,然后调用getUserById函数返回一个User对象给客户端,但是由于User对象并不是一个基础类型,我们也没有定义他的序列化和反序列化的方式,所以在第一个版本中直接返回idname两个参数。

Server.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.ca01h.rpc.rpc01;

import com.ca01h.rpc.common.IUserService;
import com.ca01h.rpc.common.User;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

public class Service {
private static boolean running = true;

public static void main(String[] args) throws Exception{
ServerSocket ss = new ServerSocket(8888);
while (running) {
Socket s = ss.accept();
process(s);
s.close();
}
ss.close();
}

private static void process(Socket s) throws Exception{
InputStream in = s.getInputStream();
OutputStream out = s.getOutputStream();
DataInputStream dis = new DataInputStream(in);
DataOutputStream dos = new DataOutputStream(out);

int id = dis.readInt();
IUserService service = new UserServiceImpl();
User user = service.findUserById(id);
dos.writeInt(user.getId());
dos.writeUTF(user.getName());
dos.flush();
}
}

client.java

最后完成客户端代码,客户端代码首先连接服务端开启的端口,然后再向服务端传入id参数,最后用阻塞的方式读取服务端传回来的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.ca01h.rpc.rpc01;

import com.ca01h.rpc.common.User;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;

public class client {
public static void main(String[] args) throws Exception{
Socket s = new Socket("127.0.0.1", 8888);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeInt(123);

s.getOutputStream().write(baos.toByteArray());
s.getOutputStream().flush();

DataInputStream dis = new DataInputStream(s.getInputStream());
int id = dis.readInt();
String name = dis.readUTF();
User user = new User(id, name);

System.out.println(user);

dos.close();
s.close();
}
}

**问题所在:不难看出,在这个例子中,初始版本最大的问题在于,如果我要对User类做修改,比如新增一些字段,或者直接新增一个Product类,那么我的客户端和服务端的代码就需要大量修改,显然不符合重用性和简洁性原则。**继续来看第二个版本。

第二个版本

在第二个版本中我们只做一个改进,把客户端关于网络连接和数据传输的具体过程抽象到另外一个代理类中,这个类的名字叫做Stub

Stub.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.ca01h.rpc.rpc02;

import com.ca01h.rpc.common.User;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;

public class Stub {
public User findUserById(Integer id) throws Exception{
Socket s = new Socket("127.0.0.1", 8888);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeInt(123);

s.getOutputStream().write(baos.toByteArray());
s.getOutputStream().flush();

DataInputStream dis = new DataInputStream(s.getInputStream());
int receivedId = dis.readInt();
String name = dis.readUTF();
User user = new User(receivedId, name);

dos.close();
s.close();
return user;
}
}

那么客户端代码就直接调用Stub类中的findUserById方法即可,服务端的代码不需要变动。

Client.java

1
2
3
4
5
6
7
8
package com.ca01h.rpc.rpc02;

public class Client {
public static void main(String[] args) throws Exception{
Stub stub = new Stub();
System.out.println(stub.findUserById(123));
}
}

这个版本的RPC代码相对上一个版本改进不大,因为Stub只实现了一个对象类中的一个方法,这样的调用会显得不太合理,甚至很弱鸡。接着来看第三个版本。

第三个版本

上一个版本的代码只实现了findUserById,如果有一个findUserByName的新方法,那么又得在Stub类中再写一个函数,但是下面这种写法就解决了函数增加的问题。

Stub.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.ca01h.rpc.rpc03;

import com.ca01h.rpc.common.IUserService;
import com.ca01h.rpc.common.User;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
public static IUserService getStub(){
InvocationHandler h = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket s = new Socket("127.0.0.1", 8888);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeInt(123);

s.getOutputStream().write(baos.toByteArray());
s.getOutputStream().flush();

DataInputStream dis = new DataInputStream(s.getInputStream());
int id = dis.readInt();
String name = dis.readUTF();
User user = new User(id, name);

dos.close();
s.close();
return user;
}
};
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
System.out.println(o.getClass().getName());
System.out.println(o.getClass().getInterfaces()[0]);
return (IUserService) o;
}
}

Client.java

1
2
3
4
5
6
7
8
9
10
package com.ca01h.rpc.rpc03;

import com.ca01h.rpc.common.IUserService;

public class Client {
public static void main(String[] args) {
IUserService service = Stub.getStub();
System.out.println(service.findUserById(123));
}
}

这里使用了动态代理的知识,简单来说,Stub.getStub这个静态方法返回了一个动态对象,其中这个动态对象实现了UserService接口。

输出结果:

1
2
3
com.sun.proxy.$Proxy0
interface com.ca01h.rpc.common.IUserService
User{id=123, name='ca01h'}

第四个版本

上一个版本的Stub.java局限于处理findUserById这个函数,那么在这个版本中无论任何一个方法,都可以用通用的代码实现:

Stub.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.ca01h.rpc.rpc04;

import com.ca01h.rpc.common.IUserService;
import com.ca01h.rpc.common.User;

import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
public static IUserService getStub(){
InvocationHandler h = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket s = new Socket("127.0.0.1", 8888);

ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());

String methodName = method.getName();
Class[] parametersTypes = method.getParameterTypes();
oos.writeUTF(methodName);
oos.writeObject(parametersTypes);
oos.writeObject(args);
oos.flush();

DataInputStream dis = new DataInputStream(s.getInputStream());
int id = dis.readInt();
String name = dis.readUTF();
User user = new User(id, name);

oos.close();
s.close();
return user;
}
};
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
System.out.println(o.getClass().getName());
System.out.println(o.getClass().getInterfaces()[0]);
return (IUserService) o;
}
}

关键点就在于这6行代码,其实看着也挺明白的,拿到客户端传过来的方法名、参数类型、参数的值去找服务端的实现。

1
2
3
4
5
6
String methodName = method.getName();
Class[] parametersTypes = method.getParameterTypes();
oos.writeUTF(methodName);
oos.writeObject(parametersTypes);
oos.writeObject(args);
oos.flush();

那么同样的,服务端的代码也得跟着变:

Server.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.ca01h.rpc.rpc04;

import com.ca01h.rpc.common.IUserService;
import com.ca01h.rpc.common.User;
import com.ca01h.rpc.rpc01.UserServiceImpl;

import java.io.*;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
private static boolean running = true;

public static void main(String[] args) throws Exception{
ServerSocket ss = new ServerSocket(8888);
while (running) {
Socket s = ss.accept();
process(s);
s.close();
}
ss.close();
}

private static void process(Socket s) throws Exception{
InputStream in = s.getInputStream();
OutputStream out = s.getOutputStream();
ObjectInputStream ois = new ObjectInputStream(in);
DataOutputStream dos = new DataOutputStream(out);

String methodName = ois.readUTF();
Class[] parametersTypes = (Class[]) ois.readObject();
Object[] args = (Object[]) ois.readObject();

IUserService service = new UserServiceImpl();
Method method = service.getClass().getMethod(methodName, parametersTypes);
User user = (User)method.invoke(service, args);
dos.writeInt(user.getId());
dos.writeUTF(user.getName());
dos.flush();
}
}

通过反射的方式向服务端调用函数。

第五个版本

这个版本就解决一个问题:返回值用Object封装,支持任意类型。

Stub.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.ca01h.rpc.rpc05;

import com.ca01h.rpc.common.IUserService;
import com.ca01h.rpc.common.User;

import java.io.DataInputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
public static IUserService getStub(){
InvocationHandler h = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket s = new Socket("127.0.0.1", 8888);

ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());

String methodName = method.getName();
Class[] parametersTypes = method.getParameterTypes();
oos.writeUTF(methodName);
oos.writeObject(parametersTypes);
oos.writeObject(args);
oos.flush();

ObjectInputStream ois= new ObjectInputStream(s.getInputStream());
User user = (User) ois.readObject();

oos.close();
s.close();
return user;
}
};
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
System.out.println(o.getClass().getName());
System.out.println(o.getClass().getInterfaces()[0]);
return (IUserService) o;
}
}

服务端相应变化:

Server.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.ca01h.rpc.rpc05;

import com.ca01h.rpc.common.IUserService;
import com.ca01h.rpc.common.User;
import com.ca01h.rpc.rpc01.UserServiceImpl;

import java.io.*;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
private static boolean running = true;

public static void main(String[] args) throws Exception{
ServerSocket ss = new ServerSocket(8888);
while (running) {
Socket s = ss.accept();
process(s);
s.close();
}
ss.close();
}

private static void process(Socket s) throws Exception{
InputStream in = s.getInputStream();
OutputStream out = s.getOutputStream();
ObjectInputStream ois = new ObjectInputStream(in);
ObjectOutputStream oos = new ObjectOutputStream(out);

String methodName = ois.readUTF();
Class[] parametersTypes = (Class[]) ois.readObject();
Object[] args = (Object[]) ois.readObject();

IUserService service = new UserServiceImpl();
Method method = service.getClass().getMethod(methodName, parametersTypes);
User user = (User)method.invoke(service, args);
oos.writeObject(user);
oos.flush();
}
}

第六个版本

以上版本的Stub.getStub只能拿到IUserService一个接口,现在需要能够帮我生成更多类型的代理。

Client.java

1
2
3
4
5
6
7
8
9
10
package com.ca01h.rpc.rpc06;

import com.ca01h.rpc.common.IUserService;

public class Client {
public static void main(String[] args) {
IUserService userService = (IUserService) Stub.getStub(IUserService.class);
System.out.println(userService.findUserById(123));
}
}

Stub.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.ca01h.rpc.rpc06;

import com.ca01h.rpc.common.IUserService;
import com.ca01h.rpc.common.User;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
public static Object getStub(Class clazz){
InvocationHandler h = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket s = new Socket("127.0.0.1", 8888);

ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
String clazzName = clazz.getName();
String methodName = method.getName();
Class[] parametersTypes = method.getParameterTypes();

oos.writeUTF(clazzName);
oos.writeUTF(methodName);
oos.writeObject(parametersTypes);
oos.writeObject(args);
oos.flush();

ObjectInputStream ois= new ObjectInputStream(s.getInputStream());
User user = (User) ois.readObject();

oos.close();
s.close();
return user;
}
};
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
System.out.println(o.getClass().getName());
System.out.println(o.getClass().getInterfaces()[0]);
return (IUserService) o;
}
}

Server.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.ca01h.rpc.rpc06;

import com.ca01h.rpc.common.IUserService;
import com.ca01h.rpc.common.User;
import com.ca01h.rpc.rpc01.UserServiceImpl;

import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
private static boolean running = true;

public static void main(String[] args) throws Exception{
ServerSocket ss = new ServerSocket(8888);
while (running) {
Socket s = ss.accept();
process(s);
s.close();
}
ss.close();
}

private static void process(Socket s) throws Exception{
InputStream in = s.getInputStream();
OutputStream out = s.getOutputStream();
ObjectInputStream ois = new ObjectInputStream(in);
ObjectOutputStream oos = new ObjectOutputStream(out);

String clazzName = ois.readUTF();
String methodName = ois.readUTF();
Class[] parametersTypes = (Class[]) ois.readObject();
Object[] args = (Object[]) ois.readObject();

Class clazz = null;

// 从服务注册表中找到具体的类
clazz = com.ca01h.rpc.rpc06.UserServiceImpl.class;
Method method = clazz.getClass().getMethod(methodName, parametersTypes);
User user = (User)method.invoke(clazz, args);
oos.writeObject(user);
oos.flush();
}
}

第七个版本

之前我们用的都是JDKSerialize来进行序列化和反序列化,这种序列化方式时间长数据多,为了演示方便,可以使用Hessian库来进行序列化和反序列化。

Hessian.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.ca01h.rpc.rpc06;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

public class HessianUtil {
public static byte[] serialize(Object o) throws Exception{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Hessian2Output output = new Hessian2Output(baos);
output.writeObject(o);
output.flush();

byte[] bytes = baos.toByteArray();
baos.close();
output.close();
return bytes;
}

public static byte[] unserialize(byte[] bytes) throws Exception{
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Hessian2Input input = new Hessian2Input(baos);
Object o = input.readObject();

bais.close();
input.close();
return bytes;
}
}
Copyright © ca01h 2019-2021 | 本站总访问量