(ICCV2019)6D-GraspNet

6D GraspNet是英伟达2019年提出的一篇抓取的论文。

​ 总体结构如上图所示。

Grasp Sampler

​ 传入一个部分观测的物体点云$X$,我们需要能够生成出对应的抓取proposal $G^*$。这是一个生成式模型的场景,也就是我们估计出后验分布$P(G^*|X)$,这样我们才能够根据任意输入$X$来得到$G^*$。论文使用的VAE来对底层隐变量进行建模。此处提供一个复习VAE的博客

​ 在VAE中,因为涉及到随机变量的采样,如果按照Original Form采样会导致梯度不能回传。所以,重参数化步骤会使得$z = \mu + \sigma\odot\varepsilon $,使得梯度可以回传,而随机节点$\varepsilon \sim N(0, 1)$不需要更新。

​ 我们使用VAE来最大化$P(G|X)$的。传入一个点云$X$和隐变量$z$,Decoder部分就是一个确定性的函数来预测出一个grasp。中间的隐变量空间为$P(z)=N(0, I)$。所以,传入一个部分观测点云X以后,我们可以在这个高斯分布上多次采样来得到不同的$z$,那么我们的问题就转化成了最大化
$$
P(G|X)=\int P(G|X,z;\Theta)P(z)dz
$$
​ 直接积分是不可积的,所以我们需要通过encoder$Q(z|X,g)$把正样本X和g映射到隐变量空间的一个子空间中。Encoder和Decoder都是基于PointNet++的,Encoder使用的方法就是把GT抓取$g$接在点云$X$后面,而Decoder就是把采样出的隐变量$z$接在点云$X$后面。

Grasp Pose Evaluation

​ 因为Decoder预测出来的抓取肯定有些能成功而有些会失败。我们需要对每个预测出来的$<X,\hat{g}>$预测一个成功率$P(S|X,g)$,其实就是一个二分类问题。但是比起直接把16维的6D grasp接在点云后面,本文使用了能够更好利用好点云的特点的encoding方法,也就是把夹爪的点云$X_g$接在原先待抓取物体点云$X$之后,并且再使用一个指示向量来标志哪些是原物体的点云。

​ 然后这个二分类网络的输入就是原点云和合成的夹爪点云,输出的就是抓取的成功率,使用交叉熵损失来优化此网络。在标签中1代表成功,0代表失败。

​ 在数据增强部分,我们对于正样本$g\in G^*$做随机扰动,使得夹爪和物体点云有碰撞或者远离物体点云,得到增强的负样本$G^-$。

Iterative Grasp Pose Refinement

​ 现在我们的网络已经可以根据点云预测出一个抓点集合了,那么我们有没有办法去进一步让这些抓取更好呢?我们希望得到一个优化位移$\Delta g$,使得$P(s=1|g+\Delta g) > P(s=1|g)$。这样的话我们就可以通过求成功率S对于抓取$g$的偏导数来做梯度上升来优化S,即$\frac{\partial S}{\partial g}$。为了保证刚体变换的约束,我们让夹爪点云$X_g$通过平移向量和欧拉角$R_g=(\alpha_g,\beta_g,\gamma_g)$,根据链式法则,我们有
$$
\Delta g=\frac{\partial S}{\partial g}=\eta\times\frac{\partial S}{\partial T(g;p)}\times\frac{\partial T(g;p)}{\partial g}
$$
​ 这样我们就可以更新了。这一步并不需要什么新的网络,只需要Grasp Evaluator提供梯度就一切好办了。

创建训练集

​ 在仿真环境中,我们随机在物体的mesh上采样点,并且把grasp的z轴对齐到点的法向量上。夹爪和物体表面的距离是从[0, gripper_length]上随机采样的,而z轴上的旋转角度也是随机采样得到的。我们对于那些closing volume和物体重叠的grasp来做simulation,抓上来以后会做一个抖动动作,如果物体在抖动以后依旧被抓着,那么我们就认为是一个正样本。

缺点

​ From实验室学长:最主要的问题是这套proposing或者sampling后面接evaluation网络的两步法,方法论上落后。两步法慢,而且没法生成dense的grasp pose,一步能搞定的时候为什么要拆成两步呢?

普通VAE的代码

​ 为了仔细理解本论文的代码,我们先从普通的VAE代码开始阅读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class BaseVAE(nn.Module):
def __init__(self) -> None:
super(BaseVAE, self).__init__()
def encode(self, input: Tensor) -> List[Tensor]:
raise NotImplementedError
def decode(self, input: Tensor) -> Any:
raise NotImplementedError
def sample(self, batch_size:int, current_device: int, **kwargs) -> Tensor:
raise NotImplementedError
def generate(self, x: Tensor, **kwargs) -> Tensor:
raise NotImplementedError
@abstractmethod
def forward(self, *inputs: Tensor) -> Tensor:
pass
@abstractmethod
def loss_function(self, *inputs: Any, **kwargs) -> Tensor:
pass

​ 相对于普通的模块,我们需要额外定义encode, decode和sample函数。

​ 在init中,无非是对称地构造encoder和decoder。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
modules = []
hidden_dims = [32, 64, 128, 256, 512]

# Build Encoder
for h_dim in hidden_dims:
modules.append(
nn.Sequential(
nn.Conv2d(in_channels, out_channels=h_dim,
kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(h_dim),
nn.LeakyReLU())
)

in_channels = h_dim
self.encoder = nn.Sequential(*modules)
self.fc_mu = nn.Linear(hidden_dims[-1]*4, latent_dim)
self.fc_var = nn.Linear(hidden_dims[-1]*4, latent_dim)
# Build Decoder
modules = []
self.decoder_input = nn.Linear(latent_dim, hidden_dims[-1] * 4)
hidden_dims.reverse()
for i in range(len(hidden_dims) - 1):
modules.append(
nn.Sequential(
nn.ConvTranspose2d(hidden_dims[i],
hidden_dims[i + 1],
kernel_size=3,
stride=2,
padding=1,
output_padding=1),
nn.BatchNorm2d(hidden_dims[i + 1]),
nn.LeakyReLU())
)
self.decoder = nn.Sequential(*modules)
self.final_layer = nn.Sequential(
nn.ConvTranspose2d(hidden_dims[-1],
hidden_dims[-1],
kernel_size=3,
stride=2,
padding=1,
output_padding=1),
nn.BatchNorm2d(hidden_dims[-1]),
nn.LeakyReLU(),
nn.Conv2d(hidden_dims[-1], out_channels=3,
kernel_size=3, padding=1),
nn.Tanh())

​ Encoder和Decoder的代码如下,因为本仓库是按照 64 * 64 * 3的图像来计算的,所以在应用不同分辨率的时候,需要计算参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def encode(self, input: Tensor) -> List[Tensor]:
"""
Encodes the input by passing through the encoder network
and returns the latent codes.
:param input: (Tensor) Input tensor to encoder [B x C x H x W]
:return: (Tensor) List of latent codes
"""
result = self.encoder(input)
# B * C * 64 * 64
# B * 32 * 32 * 32
# B * 64 * 16 * 16
# B * 128 * 8 * 8
# B * 256 * 4 * 4
# B * 512 * 2 * 2
result = torch.flatten(result, start_dim=1)
# B * 2048

# Split the result into mu and var components
# of the latent Gaussian distribution
mu = self.fc_mu(result)
# B * latent_dim
log_var = self.fc_var(result)
# B * latent_dim
return [mu, log_var]

def decode(self, z: Tensor) -> Tensor:
"""
Maps the given latent codes
onto the image space.
:param z: (Tensor) [B x latent_dim]
:return: (Tensor) [B x C x H x W]
"""
result = self.decoder_input(z)
# B * latent_dim => B * 2048
result = result.view(-1, 512, 2, 2)
# B * 512 * 2 * 2
result = self.decoder(result)
# B * 256 * 4 * 4
# B * 128 * 8 * 8
# B * 64 * 16 * 16
# B * 32 * 32 * 32
result = self.final_layer(result)
# B * 32 * 64 * 64
# B * 3 * 64 * 64
return result

​ 下面我们可以看到重参数化其实就是做了一个$z = \mu + \sigma\odot\varepsilon $,前向传播的时候,我们就需要采样一个z出来继续做decode操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def reparameterize(self, mu: Tensor, logvar: Tensor) -> Tensor:
"""
Reparameterization trick to sample from N(mu, var) from
N(0,1).
:param mu: (Tensor) Mean of the latent Gaussian [B x D]
:param logvar: (Tensor) Standard deviation of the latent Gaussian [B x D]
:return: (Tensor) [B x D]
"""
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return eps * std + mu

def forward(self, input: Tensor, **kwargs) -> List[Tensor]:
mu, log_var = self.encode(input)
z = self.reparameterize(mu, log_var)
return [self.decode(z), input, mu, log_var]

def loss_function(self,
*args,
**kwargs) -> dict:
recons = args[0]
input = args[1]
mu = args[2]
log_var = args[3]
#重建误差
recons_loss =F.mse_loss(recons, input)
#计算KL散度
kld_weight = kwargs['M_N'] # Account for the minibatch samples from the dataset
kld_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim = 1), dim = 0)
loss = recons_loss + kld_weight * kld_loss
return {'loss': loss, 'Reconstruction_Loss':recons_loss.detach(), 'KLD':-kld_loss.detach()}

def sample(self,
num_samples:int,
current_device: int, **kwargs) -> Tensor:
"""
Samples from the latent space and return the corresponding
image space map.
:param num_samples: (Int) Number of samples
:param current_device: (Int) Device to run the model
:return: (Tensor)
"""
z = torch.randn(num_samples,
self.latent_dim)

z = z.to(current_device)

samples = self.decode(z)
return samples

def generate(self, x: Tensor, **kwargs) -> Tensor:
"""
Given an input image x, returns the reconstructed image
:param x: (Tensor) [B x C x H x W]
:return: (Tensor) [B x C x H x W]
"""

return self.forward(x)[0]

KL散度公式如下:$ KL(N(\mu, \sigma), N(0, 1)) = -\log \sigma + \frac{\sigma^2 + \mu^2}{2} - \frac{1}{2}$

6D-GraspNet代码

Grasp Sampler

在代码中GraspSamplerVAE和GraspSamplerGAN都继承于GraspSampler。GraspSampler提供了共用的decoder函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class GraspSampler(nn.Module):
def __init__(self, latent_size, device):
super(GraspSampler, self).__init__()
self.latent_size = latent_size
self.device = device

def create_decoder(self, model_scale, pointnet_radius, pointnet_nclusters,
num_input_features):
# The number of input features for the decoder is 3+latent space where 3
# represents the x, y, z position of the point-cloud

self.decoder = base_network(pointnet_radius, pointnet_nclusters,
model_scale, num_input_features)
self.q = nn.Linear(model_scale * 1024, 4)
self.t = nn.Linear(model_scale * 1024, 3)
self.confidence = nn.Linear(model_scale * 1024, 1)

def decode(self, xyz, z):
# 我们输入一个点云和一个采样得到的隐变量z,我们需要预测出抓取的pos(t(x))和orn(q(x))以及这个抓取的置信度。
# 把隐变量接在点云后面作为特征
xyz_features = self.concatenate_z_with_pc(xyz,
z).transpose(-1,
1).contiguous()
for module in self.decoder[0]:
xyz, xyz_features = module(xyz, xyz_features)
# 过一轮decoder,也就是先从PointNet++中提取出B * 1024(scale)的特征向量
x = self.decoder[1](xyz_features.squeeze(-1))
predicted_qt = torch.cat(
(F.normalize(self.q(x), p=2, dim=-1), self.t(x)), -1)
return predicted_qt, torch.sigmoid(self.confidence(x)).squeeze()

def concatenate_z_with_pc(self, pc, z):
z.unsqueeze_(1)
z = z.expand(-1, pc.shape[1], -1)
return torch.cat((pc, z), -1)

def get_latent_size(self):
return self.latent_size

def base_network(pointnet_radius, pointnet_nclusters, scale, in_features):
sa1_module = pointnet2.PointnetSAModule(
npoint=pointnet_nclusters,
radius=pointnet_radius,
nsample=64,
mlp=[in_features, 64 * scale, 64 * scale, 128 * scale])
sa2_module = pointnet2.PointnetSAModule(
npoint=32,
radius=0.04,
nsample=128,
mlp=[128 * scale, 128 * scale, 128 * scale, 256 * scale])

sa3_module = pointnet2.PointnetSAModule(
mlp=[256 * scale, 256 * scale, 256 * scale, 512 * scale])

sa_modules = nn.ModuleList([sa1_module, sa2_module, sa3_module])
fc_layer = nn.Sequential(nn.Linear(512 * scale, 1024 * scale),
nn.BatchNorm1d(1024 * scale), nn.ReLU(True),
nn.Linear(1024 * scale, 1024 * scale),
nn.BatchNorm1d(1024 * scale), nn.ReLU(True))
return nn.ModuleList([sa_modules, fc_layer])

​ 如下就是类似地GraspSampleVAE的代码,相对比较容易理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class GraspSamplerVAE(GraspSampler):
"""Network for learning a generative VAE grasp-sampler
"""
# omit some functions

def create_encoder(self, model_scale, pointnet_radius, pointnet_nclusters):
# The number of input features for the encoder is 19: the x, y, z
# position of the point-cloud and the flattened 4x4=16 grasp pose matrix
# 其实就是创建了一个接收 N * 19, 输出1024的一个PointNet++ Encoder
self.encoder = base_network(pointnet_radius, pointnet_nclusters, model_scale, 19)

def create_bottleneck(self, input_size, latent_size):
# 创建了均值向量和方差向量
mu = nn.Linear(input_size, latent_size)
logvar = nn.Linear(input_size, latent_size)
self.latent_space = nn.ModuleList([mu, logvar])

def encode(self, xyz, xyz_features):
for module in self.encoder[0]:
xyz, xyz_features = module(xyz, xyz_features)
return self.encoder[1](xyz_features.squeeze(-1))

def forward(self, pc, grasp=None, train=True):
if train:
return self.forward_train(pc, grasp)
else:
return self.forward_test(pc, grasp)

def forward_train(self, pc, grasp):
# 在训练的时候,确实需要通过重参数化对z采样,这样梯度才能回传
input_features = torch.cat(
(pc, grasp.unsqueeze(1).expand(-1, pc.shape[1], -1)),
-1).transpose(-1, 1).contiguous()
z = self.encode(pc, input_features)
mu, logvar = self.bottleneck(z)
z = self.reparameterize(mu, logvar)
qt, confidence = self.decode(pc, z)
return qt, confidence, mu, logvar

def forward_test(self, pc, grasp):
# 在测试的时候,可以直接用均值来代替隐变量z
input_features = torch.cat(
(pc, grasp.unsqueeze(1).expand(-1, pc.shape[1], -1)),
-1).transpose(-1, 1).contiguous()
z = self.encode(pc, input_features)
mu, _ = self.bottleneck(z)
qt, confidence = self.decode(pc, mu)
return qt, confidence

def sample_latent(self, batch_size):
return torch.randn(batch_size, self.latent_size).to(self.device)

def generate_grasps(self, pc, z=None):
# 这个就是在inference阶段用的了,传入点云和采样到的z,可以直接把对应的grasp给预测出来
if z is None:
z = self.sample_latent(pc.shape[0])
qt, confidence = self.decode(pc, z)
return qt, confidence, z.squeeze()

def generate_dense_latents(self, resolution):
"""
For the VAE sampler we consider dense latents to correspond to those between -2 and 2
"""
latents = torch.meshgrid(*[
torch.linspace(-2, 2, resolution) for i in range(self.latent_size)
])
return torch.stack([latents[i].flatten() for i in range(len(latents))],
dim=-1).to(self.device)

​ GraspSamplerGAN因为涉及到另一篇文章的优化,此处不再扩展。

GraspNet Evaluator

​ 其实这部分就是一个PointNet++提特征的二分类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class GraspEvaluator(nn.Module):
def __init__(self,
model_scale=1,
pointnet_radius=0.02,
pointnet_nclusters=128,
device="cpu"):
super(GraspEvaluator, self).__init__()
self.create_evaluator(pointnet_radius, model_scale, pointnet_nclusters)
self.device = device

def create_evaluator(self, pointnet_radius, model_scale,
pointnet_nclusters):
# The number of input features for the evaluator is 4: the x, y, z
# position of the concatenated gripper and object point-clouds and an
# extra binary feature, which is 0 for the object and 1 for the gripper,
# to tell these point-clouds apart
self.evaluator = base_network(pointnet_radius, pointnet_nclusters,
model_scale, 4)
self.predictions_logits = nn.Linear(1024 * model_scale, 1)
self.confidence = nn.Linear(1024 * model_scale, 1)

def evaluate(self, xyz, xyz_features):
for module in self.evaluator[0]:
xyz, xyz_features = module(xyz, xyz_features)
return self.evaluator[1](xyz_features.squeeze(-1))

def forward(self, pc, gripper_pc, train=True):
# 把原始点云和夹爪点云融合在一起
pc, pc_features = self.merge_pc_and_gripper_pc(pc, gripper_pc)
x = self.evaluate(pc, pc_features.contiguous())
# 过
return self.predictions_logits(x), torch.sigmoid(self.confidence(x))

def merge_pc_and_gripper_pc(self, pc, gripper_pc):
"""
Merges the object point cloud and gripper point cloud and
adds a binary auxiliary feature that indicates whether each point
belongs to the object or to the gripper.
"""
pc_shape = pc.shape
gripper_shape = gripper_pc.shape
assert (len(pc_shape) == 3)
assert (len(gripper_shape) == 3)
assert (pc_shape[0] == gripper_shape[0])

npoints = pc_shape[1]
batch_size = pc_shape[0]

l0_xyz = torch.cat((pc, gripper_pc), 1)
# 先把两个点云接在一起
labels = [
torch.ones(pc.shape[1], 1, dtype=torch.float32),
torch.zeros(gripper_pc.shape[1], 1, dtype=torch.float32)
]
labels = torch.cat(labels, 0)
labels.unsqueeze_(0)
labels = labels.repeat(batch_size, 1, 1)
# 把标志着是否是原点云的特征接在点云后
l0_points = torch.cat([l0_xyz, labels.to(self.device)],
-1).transpose(-1, 1)
return l0_xyz, l0_points

Iterative Grasp Pose Refinement

​ 这部分代码比较复杂,也非常重要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class GraspEstimator:
"""
Includes the code used for running the inference.
"""
def __init__(self, grasp_sampler_opt, grasp_evaluator_opt, opt):
self.grasp_sampler_opt = grasp_sampler_opt
self.grasp_evaluator_opt = grasp_evaluator_opt
self.opt = opt
self.target_pc_size = opt.target_pc_size
self.num_refine_steps = opt.refine_steps
self.refine_method = opt.refinement_method
self.threshold = opt.threshold
self.batch_size = opt.batch_size
self.generate_dense_grasps = opt.generate_dense_grasps
if self.generate_dense_grasps:
self.num_grasps_per_dim = opt.num_grasp_samples
self.num_grasp_samples = opt.num_grasp_samples * opt.num_grasp_samples
else:
self.num_grasp_samples = opt.num_grasp_samples
self.choose_fn = opt.choose_fn
self.choose_fns = {
"all":
None,
"better_than_threshold":
utils.choose_grasps_better_than_threshold,
"better_than_threshold_in_sequence":
utils.choose_grasps_better_than_threshold_in_sequence,
}
self.device = torch.device("cuda:0")
self.grasp_evaluator = create_model(grasp_evaluator_opt)
self.grasp_sampler = create_model(grasp_sampler_opt)

def keep_inliers(self, grasps, confidences, z, pc, inlier_indices_list):
for i, inlier_indices in enumerate(inlier_indices_list):
grasps[i] = grasps[i][inlier_indices]
confidences[i] = confidences[i][inlier_indices]
z[i] = z[i][inlier_indices]
pc[i] = pc[i][inlier_indices]

def generate_and_refine_grasps(self, pc):
pc_list, pc_mean = self.prepare_pc(pc)
grasps_list, confidence_list, z_list = self.generate_grasps(pc_list)
inlier_indices = utils.get_inlier_grasp_indices(grasps_list,
torch.zeros(1, 3).to(
self.device),
threshold=1.0,
device=self.device)
self.keep_inliers(grasps_list, confidence_list, z_list, pc_list, inlier_indices)
improved_eulers, improved_ts, improved_success = [], [], []
for pc, grasps in zip(pc_list, grasps_list):
out = self.refine_grasps(pc, grasps, self.refine_method,
self.num_refine_steps)
improved_eulers.append(out[0])
improved_ts.append(out[1])
improved_success.append(out[2])
improved_eulers = np.hstack(improved_eulers)
improved_ts = np.hstack(improved_ts)
improved_success = np.hstack(improved_success)
if self.choose_fn is "all":
selection_mask = np.ones(improved_success.shape, dtype=np.float32)
else:
selection_mask = self.choose_fns[self.choose_fn](improved_eulers,
improved_ts,
improved_success,
self.threshold)
grasps = utils.rot_and_trans_to_grasps(improved_eulers, improved_ts,
selection_mask)
utils.denormalize_grasps(grasps, pc_mean)
refine_indexes, sample_indexes = np.where(selection_mask)
success_prob = improved_success[refine_indexes,
sample_indexes].tolist()
return grasps, success_prob

def prepare_pc(self, pc):
if pc.shape[0] > self.target_pc_size:
pc = utils.regularize_pc_point_count(pc, self.target_pc_size)
pc_mean = np.mean(pc, 0)
pc -= np.expand_dims(pc_mean, 0)
pc = np.tile(pc, (self.num_grasp_samples, 1, 1))
pc = torch.from_numpy(pc).float().to(self.device)
pcs = []
pcs = utils.partition_array_into_subarrays(pc, self.batch_size)
return pcs, pc_mean

def generate_grasps(self, pcs):
all_grasps = []
all_confidence = []
all_z = []
if self.generate_dense_grasps:
latent_samples = self.grasp_sampler.net.module.generate_dense_latents(
self.num_grasps_per_dim) # 对标准高斯分布采样
latent_samples = utils.partition_array_into_subarrays(
latent_samples, self.batch_size)
for latent_sample, pc in zip(latent_samples, pcs):
grasps, confidence, z = self.grasp_sampler.generate_grasps(
pc, latent_sample)
all_grasps.append(grasps)
all_confidence.append(confidence)
all_z.append(z)
# 对每个点云获得对应的抓取
else:
for pc in pcs:
grasps, confidence, z = self.grasp_sampler.generate_grasps(pc)
all_grasps.append(grasps)
all_confidence.append(confidence)
all_z.append(z)
return all_grasps, all_confidence, all_z

def refine_grasps(self, pc, grasps, refine_method, num_refine_steps=10):

grasp_eulers, grasp_translations = utils.convert_qt_to_rt(grasps)
if refine_method == "gradient":
improve_fun = self.improve_grasps_gradient_based
grasp_eulers = torch.autograd.Variable(grasp_eulers.to(
self.device),requires_grad=True)
grasp_translations = torch.autograd.Variable(grasp_translations.to(
self.device),requires_grad=True)

else:
improve_fun = self.improve_grasps_sampling_based

improved_success = []
improved_eulers = []
improved_ts = []
improved_eulers.append(grasp_eulers.cpu().data.numpy())
improved_ts.append(grasp_translations.cpu().data.numpy())
last_success = None
for i in range(num_refine_steps):
# 对于每个grasp都通过improve_fun来提高成功率
success_prob, last_success = improve_fun(pc, grasp_eulers,
grasp_translations,
last_success)
improved_success.append(success_prob.cpu().data.numpy())
improved_eulers.append(grasp_eulers.cpu().data.numpy())
improved_ts.append(grasp_translations.cpu().data.numpy())

# we need to run the success on the final improved grasps
grasp_pcs = utils.control_points_from_rot_and_trans(
grasp_eulers, grasp_translations, self.device)
improved_success.append(
self.grasp_evaluator.evaluate_grasps(
pc, grasp_pcs).squeeze().cpu().data.numpy())

return np.asarray(improved_eulers), np.asarray(
improved_ts), np.asarray(improved_success)

def improve_grasps_gradient_based(self, pcs, grasp_eulers, grasp_trans, last_success): #euler_angles, translation, eval_and_improve, metadata):
grasp_pcs = utils.control_points_from_rot_and_trans(
grasp_eulers, grasp_trans, self.device)

success = self.grasp_evaluator.evaluate_grasps(pcs, grasp_pcs)
success.squeeze().backward(
torch.ones(success.shape[0]).to(self.device))
delta_t = grasp_trans.grad
norm_t = torch.norm(delta_t, p=2, dim=-1).to(self.device)
# Adjust the alpha so that it won't update more than 1 cm. Gradient is only valid
# in small neighborhood.
alpha = torch.min(0.01 / norm_t, torch.tensor(1.0).to(self.device))
# 这里就直接对grasp的pos和orn做梯度上升
grasp_trans.data += grasp_trans.grad * alpha[:, None]
temp = grasp_eulers.clone()
grasp_eulers.data += grasp_eulers.grad * alpha[:, None]
return success.squeeze(), None

def improve_grasps_sampling_based(self, pcs, grasp_eulers, grasp_trans, last_success=None):
with torch.no_grad():
if last_success is None:
grasp_pcs = utils.control_points_from_rot_and_trans(
grasp_eulers, grasp_trans, self.device)
last_success = self.grasp_evaluator.evaluate_grasps(
pcs, grasp_pcs)

delta_t = 2 * (torch.rand(grasp_trans.shape).to(self.device) - 0.5)
delta_t *= 0.02
delta_euler_angles = (torch.rand(grasp_eulers.shape).to(self.device) - 0.5) * 2
# 基于采样的算法就要如上进行采样,然后计算出优化以后的grasp

perturbed_translation = grasp_trans + delta_t
perturbed_euler_angles = grasp_eulers + delta_euler_angles
grasp_pcs = utils.control_points_from_rot_and_trans(
perturbed_euler_angles, perturbed_translation, self.device)

perturbed_success = self.grasp_evaluator.evaluate_grasps(pcs, grasp_pcs)
ratio = perturbed_success / torch.max(last_success, torch.tensor(0.0001).to(self.device))
# 丢到Estimator里看看有没有增加成功率
mask = torch.rand(ratio.shape).to(self.device) <= ratio
next_success = last_success
ind = torch.where(mask)[0]
next_success[ind] = perturbed_success[ind]
grasp_trans[ind].data = perturbed_translation.data[ind]
grasp_eulers[ind].data = perturbed_euler_angles.data[ind]
return last_success.squeeze(), next_success
Author

Kami-code

Posted on

2022-01-17

Updated on

2022-02-19

Licensed under

Comments