缺点

词表太大量化才可

参考地址

https://aistudio.baidu.com/projectdetail/8103098

训练验证

import os
from glob import glob
import cv2
import paddle
import faiss
from out_yolo_model import GPT as GPT13
import pandas as pd
import json
from tqdm import tqdm
import numpy as np
from paddle.io import DataLoader, Dataset
import warnings

warnings.filterwarnings('ignore')


#  36 36
def gen_small_voc():
    num = "0123456789" + 'qwertyuiopasdfghjklzxcvbnm' + "QWERTYUIOPASDFGHJKLZXCVBNM"
    num = list(num)
    small_em_voc = dict()

    voc_id = 0
    for i in range(16):
        for n in num:
            small_em_voc[voc_id] = "{}_{}".format(i, n)
            voc_id += 1
    return small_em_voc


def random_gen_voc():
    num = "0123456789" + 'qwertyuiopasdfghjklzxcvbnm' + "QWERTYUIOPASDFGHJKLZXCVBNM"
    num = list(num)
    p_list = ["{}_{}".format(i, np.random.choice(num)) for i in range(16)]
    return "#".join(p_list)


def gen_text_voc_to_token_id():
    large_em_voc = dict()
    large = []
    for x in range(28 * 28):
        for w in range(28):
            for h in range(28):
                for class_name in range(15):
                    large.append("x_{}_w_{}_h_{}_class_{}".format(x, w, h, class_name))

    large.append("<|end|>")
    large.append("<|start|>")

    for ii in tqdm(large):
        while True:
            two = random_gen_voc()
            if large_em_voc.get(two, None) is None:
                large_em_voc[two] = ii
                large_em_voc[ii] = two
                break
    pd.to_pickle(large_em_voc, "large_em_voc.pkl")


class MyDataSet(Dataset):
    def __init__(self):
        super(MyDataSet, self).__init__()

        txt = glob("D:/chromedownload/VisDrone2019-DET-train/annotations/*")
        image = glob("D:/chromedownload/VisDrone2019-DET-train/images/*")
        data_txt_image = []
        for one in txt:
            two = one.replace("D:/chromedownload/VisDrone2019-DET-train/annotations\\",
                              "D:/chromedownload/VisDrone2019-DET-train/images\\").replace(".txt", ".jpg")
            if two in image:
                data_txt_image.append((one, two))
        self.data = data_txt_image
        self.large_token_to_samll_token = pd.read_pickle("large_em_voc.pkl")
        self.small_token_to_token_id = gen_small_voc()
        self.small_token_to_token_id = {k: v for v, k in self.small_token_to_token_id.items()}

    def init_val(self):
        txt = glob("D:/chromedownload/VisDrone2019-DET-test-dev/annotations/*")
        image = glob("D:/chromedownload/VisDrone2019-DET-test-dev/images/*")
        data_txt_image = []
        for one in txt:
            two = one.replace("D:/chromedownload/VisDrone2019-DET-test-dev/annotations\\",
                              "D:/chromedownload/VisDrone2019-DET-test-dev/images\\").replace(".txt", ".jpg")
            if two in image:
                data_txt_image.append((one, two))
        self.data = data_txt_image
        self.large_token_to_samll_token = pd.read_pickle("large_em_voc.pkl")
        self.small_token_to_token_id = gen_small_voc()
        self.small_token_to_token_id = {k: v for v, k in self.small_token_to_token_id.items()}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, item):
        text, image = self.data[item]

        image = cv2.imread(image)
        h, w, c = image.shape
        image = cv2.resize(image, (224, 224)) / 256
        text_df = pd.read_csv(text)

        text_df = pd.DataFrame(text_df.values.tolist() + [text_df.columns.values.tolist()]).astype("float")

        center_x = (text_df[0] + text_df[2] / 2) * 224 / w
        center_y = (text_df[1] + text_df[3] / 2) * 224 / h

        center_w = text_df[2] / 2 * 224 / w
        center_h = text_df[3] / 2 * 224 / h
        xy_index = 0
        center_x_y = np.zeros(center_x.size)
        for i in range(0, 224, 8):
            j = i + 8
            for ii in range(0, 224, 8):
                jj = ii + 8
                center_x_y[(ii <= center_x.values) * (center_x.values <= jj) * (i <= center_y.values) * (
                        center_y.values <= j)] = xy_index
                xy_index += 1
        text_df["xy"] = center_x_y
        text_df["w"] = center_w
        text_df["h"] = center_h

        text_df = text_df.astype(
            "int").sort_values([1, 0])

        text_df = text_df.iloc[:128]

        xy = "x_" + text_df.astype("str")["xy"] + "_w_" + text_df.astype("str")["w"] + "_h_" + text_df.astype("str")[
            "h"] + "_class_" + text_df.astype("str")[5]
        xy = xy.values

        text_token = [self.large_token_to_samll_token.get(xy_i) for xy_i in xy]
        text_token = [[self.small_token_to_token_id.get(j) for j in jj.split("#")] for jj in text_token if jj]

        text_token = np.array(text_token).reshape([-1, 16])

        return image, text_token, [self.small_token_to_token_id.get(i) for i in
                                   self.large_token_to_samll_token.get("<|end|>").split("#")], [
            self.small_token_to_token_id.get(i) for i in
            self.large_token_to_samll_token.get("<|start|>").split("#")]


def gn(items):
    seq_len = 0
    image = []

    for x, y, z, s in items:
        if y.shape[0] > seq_len:
            seq_len = y.shape[0]
        image.append(x.transpose([2, 0, 1]).reshape([1, 3, 224, 224]))
    seq_len += 1

    text = []
    for x, y, z, s in items:
        one = np.concatenate([[s], y, (seq_len - y.shape[0]) * [z]]).reshape([1, -1, 16])
        text.append(one)
    return np.concatenate(image), np.concatenate(text)


def val():
    small_em_voc = gen_small_voc()
    # small_voc_em = {k: v for v, k in small_em_voc.items()}
    # large_em_voc = dict()

    model = GPT13(len(small_em_voc), 512, 32, 8)
    model.load_dict(paddle.load("duo_yang_xing.pkl"))
    model.eval()
    # model.load_dict(paddle.load("gpt.pdparams"))
    print("参数量:",
          sum([i.shape[0] * i.shape[-1] if len(i.shape) > 1 else i.shape[-1] for i in model.parameters()]) / 1000000000,
          "B")
    loss_func = paddle.nn.CrossEntropyLoss()

    bar = tqdm(range(1))
    batch_size = 5

    data_set = MyDataSet()
    data_set.init_val()
    data = DataLoader(data_set, batch_size=batch_size, shuffle=True, num_workers=5, collate_fn=gn)
    data_count = 0
    loss_list = []
    for epoch in bar:

        for image, text in data:
            try:

                out, _ = model(text[:, :-1].astype("int64"), image.astype("float32"))
                loss = loss_func(out, text[:, 1:].reshape([out.shape[0], -1]).astype("int64"))
                loss_list.append(loss.item())
                bar.set_description(
                    "epoch___{}__loss__{:.5f}___data_count__{}".format(epoch, np.mean(loss_list), data_count))

                data_count += batch_size

            except:

                paddle.device.cuda.empty_cache()


def eval_data():
    small_em_voc = gen_small_voc()
    small_voc_em = {k: v for v, k in small_em_voc.items()}
    large_em_voc = pd.read_pickle("large_em_voc.pkl")

    model = GPT13(len(small_em_voc), 512, 32, 8)
    model.load_dict(paddle.load("duo_yang_xing.pkl"))
    model.eval()
    # model.load_dict(paddle.load("gpt.pdparams"))
    print("参数量:",
          sum([i.shape[0] * i.shape[-1] if len(i.shape) > 1 else i.shape[-1] for i in model.parameters()]) / 1000000000,
          "B")

    batch_size = 2

    faiss_index = faiss.IndexFlatIP(8192)
    key_list=[]
    for i in tqdm(large_em_voc.keys()):
        if len(i) > 32 and "#" in i:
            out_em=model.embedding(paddle.to_tensor([small_voc_em.get(ii) for ii in i.split("#")]).reshape([1, 1, -1])).reshape([1,-1])
            out_em /= np.linalg.norm(out_em, axis=-1, keepdims=True)
            faiss_index.add(out_em)
            key_list.append(large_em_voc.get(i))



    data_set = MyDataSet()
    data_set.init_val()
    data = DataLoader(data_set, batch_size=batch_size, shuffle=True, num_workers=5, collate_fn=gn)



    for image, text in data:


        out, _ = model(text[:, :-1].astype("int64"), image.astype("float32"))
        out_em = model.embedding(paddle.argmax(out, -1).reshape([batch_size,-1,16])[0,0].reshape([1,1,16])).reshape([1,-1])
        out_em /= np.linalg.norm(out_em, axis=-1, keepdims=True)
        di,index_index=faiss_index.search(out_em,10)
        print(key_list[index_index[0,0]])



def train():
    small_em_voc = gen_small_voc()
    # small_voc_em = {k: v for v, k in small_em_voc.items()}
    # large_em_voc = dict()

    model = GPT13(len(small_em_voc), 512, 32, 8)
    # model.load_dict(paddle.load("duo_yang_xing.pkl"))
    # model.load_dict(paddle.load("gpt.pdparams"))
    print("参数量:",
          sum([i.shape[0] * i.shape[-1] if len(i.shape) > 1 else i.shape[-1] for i in model.parameters()]) / 1000000000,
          "B")
    loss_func = paddle.nn.CrossEntropyLoss()
    opt = paddle.optimizer.Adam(parameters=model.parameters(), learning_rate=0.0003)

    bar = tqdm(range(200))
    batch_size = 5

    data_set = MyDataSet()
    data = DataLoader(data_set, batch_size=batch_size, shuffle=True, num_workers=5, collate_fn=gn)
    data_count = 0

    for epoch in bar:

        for image, text in data:
            try:

                out, _ = model(text[:, :-1].astype("int64"), image.astype("float32"))
                loss = loss_func(out, text[:, 1:].reshape([out.shape[0], -1]).astype("int64"))
                bar.set_description(
                    "epoch___{}__loss__{:.5f}___data_count__{}".format(epoch, loss.item(), data_count))
                opt.clear_grad()
                loss.backward()
                opt.step()
                data_count += batch_size
                if data_count % 1000 == 0:
                    paddle.save(model.state_dict(), "duo_yang_xing.pkl")
                    paddle.device.cuda.empty_cache()

            except:

                paddle.device.cuda.empty_cache()

        paddle.save(model.state_dict(), "duo_yang_xing.pkl")
    paddle.save(model.state_dict(), "duo_yang_xing.pkl")


if __name__ == '__main__':
    # gen_text_voc_to_token_id()
    # train()
    # val()
    eval_data()

模型

import math

import paddle
import paddle.nn as nn


class MaxState(paddle.nn.Layer):
    def __init__(self, hidden_dim, heads, win):
        super(MaxState, self).__init__()

        assert hidden_dim % heads == 0, "Hidden size must be divisible by the number of heads."

        self.head_size = hidden_dim // heads
        self.head = paddle.nn.Linear(hidden_dim, hidden_dim, bias_attr=False)
        self.head_num = heads
        self.win = win
        self.hidden = hidden_dim
        self.mask = paddle.triu(paddle.ones([win, win]))

    def forward(self, input_data, state=None):
        b, s, k, h, w = input_data.shape[0], input_data.shape[1], self.head_num, self.head_size, self.win

        window = paddle.ones([1, w])

        out = self.head(input_data)

        out = out.unsqueeze(-1) @ window

        out = out.transpose([0, 2, 1, 3])

        one_list = []
        if state is None:
            state = paddle.ones([out.shape[0], out.shape[1], 1, 1]) * float("-inf")
        for i in range(0, s, w):
            j = w + i
            one = out[:, :, i:j]
            _, _, r, c = one.shape
            if r != self.win:

                one = paddle.where(self.mask[:r, :], one, paddle.to_tensor(-float('inf')))
            else:
                one = paddle.where(self.mask, one, paddle.to_tensor(-float('inf')))

            one = paddle.concat([one, state @ window], axis=2)
            state = paddle.max(one, axis=2, keepdim=True)
            one = state.reshape([b, k, h, w])
            state = state[..., -1:]
            if r != self.win:
                one = one[..., :r]

            one = one.transpose([0, 3, 1, 2])
            one_list.append(one)
        out = paddle.concat(one_list, 1)
        out = out.reshape([b, s, -1])

        return out, state


class FeedForward(nn.Layer):
    def __init__(self, hidden_size):
        super(FeedForward, self).__init__()

        self.ffn1 = nn.Linear(hidden_size, hidden_size * 2)
        self.ffn2 = nn.Linear(hidden_size * 2, hidden_size)
        self.gate = nn.Linear(hidden_size, hidden_size * 2)
        self.relu = nn.Silu()

    def forward(self, x):
        x1 = self.ffn1(x)
        x2 = self.relu(self.gate(x))
        x = x1 * x2
        x = self.ffn2(x)
        return x


class RMSNorm(nn.Layer):
    def __init__(self, dim, eps: float = 1e-6):
        super(RMSNorm, self).__init__()
        self.eps = eps
        self.fc = paddle.create_parameter(shape=[dim], dtype='float32',
                                          default_initializer=nn.initializer.Constant(value=1.0))

    def norm(self, x):
        return x * paddle.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

    def forward(self, x):
        output = self.norm(x)

        return output * self.fc


class GPTDecoderLayer(nn.Layer):
    def __init__(self, hidden_size, num_heads):
        super(GPTDecoderLayer, self).__init__()
        # self.self_attention = MaskMultiHeadAttention(hidden_size, num_heads)
        self.self_attention = MaxState(hidden_size, num_heads, 8)
        self.ffn = FeedForward(hidden_size)
        self.norm = nn.LayerNorm(hidden_size)
        self.norm1 = RMSNorm(hidden_size)

    def forward(self, x, state=None, seq_len=None):
        x1, state = self.self_attention(x, state)  # Self-Attention with residual connection
        x = x1 + x
        x = self.norm(x)

        x = self.ffn(x) + x  # Feed-Forward with residual connection
        x = self.norm1(x)
        return x, state


class PositionalEncoding(nn.Layer):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        # Create a long enough Paddle array to hold position encodings for the maximum sequence length
        position = paddle.arange(max_len).unsqueeze(1).astype("float32")
        # Create a constant 'pe' matrix with the same size as the embedding matrix
        div_term = paddle.exp(paddle.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = paddle.zeros([max_len, d_model])
        pe[:, 0::2] = paddle.sin(position * div_term)
        pe[:, 1::2] = paddle.cos(position * div_term)
        self.pe = pe.unsqueeze(0)  # Shape: [1, max_len, d_model]
        # Register 'pe' as a buffer (non-trainable parameter)

    def forward(self, x, seq_len=None):
        # x is of shape [batch_size, seq_len, d_model]

        if seq_len is None:
            seq_len = x.shape[1]
            return x + self.pe[:, :seq_len, :]
        else:
            return x + self.pe[:, seq_len - 1:seq_len, :]


# %%

def sinusoidal_position_embedding(max_len, output_dim):
    # (max_len, 1)
    position = paddle.arange(0, max_len, dtype="float32").unsqueeze(-1)
    # (output_dim//2)
    ids = paddle.arange(0, output_dim // 2, dtype="float32")  # 即公式里的i, i的范围是 [0,d/2]
    theta = 10000 ** (-2 * ids / output_dim)
    # (max_len, output_dim//2)
    embeddings = position * theta  # 即公式里的:pos / (10000^(2i/d))
    sin_embeddings = paddle.sin(embeddings)
    cos_embeddings = paddle.cos(embeddings)
    return sin_embeddings, cos_embeddings


def rope(q, sin_em, cos_em, seq_len=None):
    if seq_len is None:

        sin_em = sin_em[:q.shape[2]]
        cos_em = cos_em[:q.shape[2]]

    else:
        sin_em = sin_em[seq_len - 1:seq_len]
        cos_em = cos_em[seq_len - 1:seq_len]

    q1 = q.reshape([q.shape[0], q.shape[1], q.shape[2], -1, 2])[..., 1]
    q2 = q.reshape([q.shape[0], q.shape[1], q.shape[2], -1, 2])[..., 0]
    # 奇数负值*sin_em+偶数正值*cos_em  奇数正值*cos_em+偶数正值*sin_em

    q3 = paddle.stack([-q1 * sin_em + q2 * cos_em, q1 * cos_em + q2 * sin_em], -1)
    q = q3.reshape(q.shape)  # reshape后就是正负交替了
    return q


class ConvEm(nn.Layer):
    def __init__(self, hidden_size):
        super(ConvEm, self).__init__()
        # 定义卷积层
        self.conv1 = nn.Conv2D(in_channels=3, out_channels=hidden_size//16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2D(hidden_size//16)

        # 定义第二个卷积层
        self.conv2 = nn.Conv2D(in_channels=hidden_size//16, out_channels=hidden_size//16, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2D(hidden_size//16)

    def forward(self, im):
        # 通过第一个卷积块
        x = nn.functional.relu(self.bn1(self.conv1(im)))
        # 通过第二个卷积块
        x = self.bn2(self.conv2(x))+x
        # 应用ReLU激活函数
        x = nn.functional.relu(x)

        return paddle.nn.functional.max_pool2d(x,4)


class GPT(nn.Layer):
    def __init__(self, vocab_size, hidden_size, num_heads, num_layers):
        super(GPT, self).__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        self.label_embedding = nn.Embedding(vocab_size, hidden_size)

        self.decoder_layers = nn.LayerList([GPTDecoderLayer(hidden_size, num_heads) for _ in range(num_layers)])
        self.fc = nn.Linear(hidden_size, vocab_size, bias_attr=False)
        self.sin_em, self.cos_em = sinusoidal_position_embedding(50000, hidden_size // num_heads // 2)
        self.conv = paddle.nn.Conv1D(1, 16, kernel_size=3, padding=1, bias_attr=False)
        self.out = nn.Linear(16, 16, bias_attr=False)

        self.layer_nor = paddle.nn.LayerNorm(hidden_size)
        # self.rms_norm=RMSNorm(hidden_size)
        self.cv_state = ConvEm(hidden_size)

    def forward(self, xx, image, state=None, seq_len=None):
        xx = self.embedding(xx)
        # x = self.position_embedding(x, seq_len)
        image=self.cv_state(image)
        image =image.reshape([image.shape[0],xx.shape[-1],-1])
        image=image.reshape([image.shape[0],image.shape[1],-1]).unsqueeze(-1)+paddle.zeros([1,1,1,xx.shape[1]])
        image =image.transpose([0,3,2,1])
        x = paddle.max(paddle.concat([xx,image],-2), -2)

        if state is None:
            state = [None] * len(self.decoder_layers)

        i = 0
        x = rope(x.reshape([x.shape[0], x.shape[1], -1, self.sin_em.shape[1] * 2]).transpose([0, 2, 1, 3]),
                 self.sin_em,
                 self.cos_em, seq_len).transpose([0, 2, 1, 3]).reshape(x.shape) + x
        for decoder_layer in self.decoder_layers:
            x1, state[i] = decoder_layer(x, state[i])
            x = x1 + x
            i += 1

        # out = self.fc(self.rms_norm(x))
        out = self.conv(x.reshape([-1, 1, x.shape[-1]])) + xx.reshape([-1, 16, x.shape[-1]])
        out = out.reshape([x.shape[0], -1, x.shape[-1]])
        out = self.fc(self.layer_nor(out))
        return out, state

解析

这段代码定义了一个基于 PaddlePaddle 的 GPT 模型,包含了多个自定义的神经网络层和前向传播逻辑。下面是逐行解析:

import math
import paddle
import paddle.nn as nn

导入 Python 的数学库、PaddlePaddle 深度学习框架以及 PaddlePaddle 的神经网络相关模块。

class MaxState(paddle.nn.Layer):
    def __init__(self, hidden_dim, heads, win):
        super(MaxState, self).__init__()
        # 确保隐藏层维度能够被头数整除
        assert hidden_dim % heads == 0, "Hidden size must be divisible by the number of heads."
        # 计算每个头的尺寸
        self.head_size = hidden_dim // heads
        # 定义一个线性层
        self.head = paddle.nn.Linear(hidden_dim, hidden_dim, bias_attr=False)
        # 头的数量
        self.head_num = heads
        # 窗口大小
        self.win = win
        # 隐藏层维度
        self.hidden = hidden_dim
        # 创建上三角矩阵作为掩码
        self.mask = paddle.triu(paddle.ones([win, win]))
    def forward(self, input_data, state=None):
        # 获取输入数据的维度信息
        b, s, k, h, w = input_data.shape[0], input_data.shape[1], self.head_num, self.head_size, self.win
        # 创建一个窗口向量
        window = paddle.ones([1, w])
        # 通过线性层处理输入数据
        out = self.head(input_data)
        # 执行矩阵乘法
        out = out.unsqueeze(-1) @ window
        # 调整输出的维度
        out = out.transpose([0, 2, 1, 3])
        # 初始化一个列表来保存处理后的窗口数据
        one_list = []
        # 如果没有状态,则初始化状态
        if state is None:
            state = paddle.ones([out.shape[0], out.shape[1], 1, 1]) * float("-inf")
        # 遍历输入数据以窗口大小进行切片
        for i in range(0, s, w):
            j = w + i
            one = out[:, :, i:j]
            # 获取当前窗口的尺寸
            _, _, r, c = one.shape
            # 如果窗口尺寸不等于预设的win,则应用掩码
            if r != self.win:
                one = paddle.where(self.mask[:r, :], one, paddle.to_tensor(-float('inf')))
            else:
                one = paddle.where(self.mask, one, paddle.to_tensor(-float('inf')))
            # 将状态与窗口向量相乘并拼接
            one = paddle.concat([one, state @ window], axis=2)
            # 计算窗口内的最大值作为新的状态
            state = paddle.max(one, axis=2, keepdim=True)
            # 调整状态的形状
            one = state.reshape([b, k, h, w])
            state = state[..., -1:]
            # 如果窗口尺寸不等于预设的win,则裁剪输出
            if r != self.win:
                one = one[..., :r]
            # 调整输出的维度并添加到列表中
            one = one.transpose([0, 3, 1, 2])
            one_list.append(one)
        # 将所有窗口的数据拼接起来
        out = paddle.concat(one_list, 1)
        # 调整输出的形状
        out = out.reshape([b, s, -1])
        # 返回处理后的输出和状态
        return out, state

MaxState 类定义了一个自定义的神经网络层,它似乎用于处理输入数据的窗口并计算每个窗口的最大状态。

class FeedForward(nn.Layer):
    def __init__(self, hidden_size):
        super(FeedForward, self).__init__()
        # 定义两个线性层
        self.ffn1 = nn.Linear(hidden_size, hidden_size * 2)
        self.ffn2 = nn.Linear(hidden_size * 2, hidden_size)
        # 定义门控机制
        self.gate = nn.Linear(hidden_size, hidden_size * 2)
        # 定义激活函数
        self.relu = nn.Silu()
    def forward(self, x):
        # 通过第一个线性层
        x1 = self.ffn1(x)
        # 通过门控机制和激活函数
        x2 = self.relu(self.gate(x))
        # 元素乘
        x = x1 * x2
        # 通过第二个线性层
        x = self.ffn2(x)
        # 返回输出
        return x

FeedForward 类定义了一个前馈神经网络层,它包含两个线性层和一个门控机制,以及一个激活函数。这个前馈网络用于 GPT 模型中的每个解码器层。

class RMSNorm(nn.Layer):
    def __init__(self, dim, eps: float = 1e-6):
        super(RMSNorm, self).__init__()
        self.eps = eps
        # 创建一个可学习的参数,初始化为1.0
        self.fc = paddle.create_parameter(shape=[dim], dtype='float32',
                                          default_initializer=nn.initializer.Constant(value=1.0))
    def norm(self, x):
        # 计算 RMSNorm
        return x * paddle.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)
    def forward(self, x):
        # 应用 RMSNorm 并乘以可学习的参数
        output = self.norm(x)
        return output * self.fc

RMSNorm 类实现了 RMSNorm 归一化,这是一种在自然语言处理模型中常用的归一化技术。

class GPTDecoderLayer(nn.Layer):
    def __init__(self, hidden_size, num_heads):
        super(GPTDecoderLayer, self).__init__()
        # 自我注意力层
        # self.self_attention = MaskMultiHeadAttention(hidden_size, num_heads)
        self.self_attention = MaxState(hidden_size, num_heads, 8)
        # 前馈网络
        self.ffn = FeedForward(hidden_size)
        # 层归一化
        self.norm = nn.LayerNorm(hidden_size)
        # RMSNorm 归一化
        self.norm1 = RMSNorm(hidden_size)
    def forward(self, x, state=None, seq_len=None):
        # 自我注意力层的前向传播
        x1, state = self.self_attention(x, state)
        # 残差连接和层归一化
        x = x1 + x
        x = self.norm(x)
        # 前馈网络的前向传播
        x = self.ffn(x) + x
        # 残差连接和 RMSNorm 归一化
        x = self.norm1(x)
        # 返回输出和状态
        return x, state

GPTDecoderLayer 类定义了 GPT 模型中的一个解码器层,它包含自我注意力层、前馈网络和两种归一化层。

class PositionalEncoding(nn.Layer):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        # 创建位置编码
        position = paddle.arange(max_len).unsqueeze(1).astype("float32")
        div_term = paddle.exp(paddle.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = paddle.zeros([max_len, d_model])
        pe[:, 0::2] = paddle.sin(position * div_term)
        pe[:, 1::2] = paddle.cos(position * div_term)
        self.pe = pe.unsqueeze(0)  # Shape: [1, max_len, d_model]
        # 将位置编码注册为缓冲区(非可训练参数)
    def forward(self, x, seq_len=None):
        # 如果没有提供序列长度,则使用整个位置编码
        if seq_len is None:
            seq_len = x.shape[1]
            return x + self.pe[:, :seq_len, :]
        else:
            return x + self.pe[:, seq_len - 1:seq_len, :]

PositionalEncoding 类实现了位置编码,这是一种在序列模型中常用的技术,用于给模型提供关于输入序列中单词顺序的信息。

def sinusoidal_position_embedding(max_len, output_dim):
    # 创建正弦和余弦位置嵌入
    position = paddle.arange(0, max_len, dtype="float32").unsqueeze(-1)
    ids = paddle.arange(0, output_dim // 2, dtype="float32")
    theta = 10000 ** (-2 * ids / output_dim)
    embeddings = position * theta
    sin_embeddings = paddle.sin(embeddings)
    cos_embeddings = paddle.cos(embeddings)
    return sin_embeddings, cos_embeddings

sinusoidal_position_embedding 函数实现了正弦和余弦位置嵌入的计算。

def rope(q, sin_em, cos_em, seq_len=None):
    # 应用旋转位置嵌入
    if seq_len is None:
        sin_em = sin_em[:q.shape[2]]
        cos_em = cos_em[:q.shape[2]]
    else:
        sin_em = sin_em[seq_len - 1:seq_len]
        cos_em = cos_em[seq_len - 1:seq_len]
    # 执行旋转操作
    q1 = q.reshape([q.shape[0], q.shape[1], q.shape[2], -1, 2])[..., 1]
    q2 = q.reshape([q.shape[0], q.shape[1], q.shape[2], -1, 2])[..., 0]
    # 奇数负值*sin_em+偶数正值*cos_em  奇数正值*cos_em+偶数正值*sin_em
    q3 = paddle.stack([-q1 * sin_em + q2 * cos_em, q1 * cos_em + q2 * sin_em], -1)
    q = q3.reshape(q.shape)  # reshape后就是正负交替了
    return q

rope 函数实现了旋转位置嵌入(RoPE),这是一种改进的位置编码方法,它通过对嵌入向量进行旋转来编码位置信息。

class ConvEm(nn.Layer):
    def __init__(self, hidden_size):
        super(ConvEm, self).__init__()
        # 定义卷积层
        self.conv1 = nn.Conv2D(in_channels=3, out_channels=hidden_size//16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2D(hidden_size//16)
        # 定义第二个卷积层
        self.conv2 = nn.Conv2D(in_channels=hidden_size//16, out_channels=hidden_size//16, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2D(hidden_size//16)
    def forward(self, im):
        # 通过第一个卷积块
        x = nn.functional.relu(self.bn1(self.conv1(im)))
        # 通过第二个卷积块
        x = self.bn2(self.conv2(x))+x
        # 应用ReLU激活函数
        x = nn.functional.relu(x)
        return paddle.nn.functional.max_pool2d(x,4)

ConvEm 类定义了一个卷积神经网络,用于处理图像数据,提取特征,并将其转换为与 GPT 模型兼容的嵌入向量。

class GPT(nn.Layer):
    def __init__(self, vocab_size, hidden_size, num_heads, num_layers):
        super(GPT, self).__init__()
        # 定义词嵌入层
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        # 定义标签嵌入层
        self.label_embedding = nn.Embedding(vocab_size, hidden_size)
        # 定义解码器层列表
        self.decoder_layers = nn.LayerList([GPTDecoderLayer(hidden_size, num_heads) for _ in range(num_layers)])
        # 定义输出层的线性层
        self.fc = nn.Linear(hidden_size, vocab_size, bias_attr=False)
        # 创建正弦和余弦位置嵌入
        self.sin_em, self.cos_em = sinusoidal_position_embedding(50000, hidden_size // num_heads // 2)
        # 定义卷积层
        self.conv = paddle.nn.Conv1D(1, 16, kernel_size=3, padding=1, bias_attr=False)
        # 定义输出层的线性层
        self.out = nn.Linear(16, 16, bias_attr=False)
        # 定义层归一化
        self.layer_nor = paddle.nn.LayerNorm(hidden_size)
        # 定义RMSNorm归一化
        # self.rms_norm=RMSNorm(hidden_size)
        # 定义卷积状态层
        self.cv_state = ConvEm(hidden_size)
    def forward(self, xx, image, state=None, seq_len=None):
        # 通过词嵌入层
        xx = self.embedding(xx)
        # 通过卷积状态层处理图像数据
        image=self.cv_state(image)
        image =image.reshape([image.shape[0],xx.shape[-1],-1])
        image=image.reshape([image.shape[0],image.shape[1],-1]).unsqueeze(-1)+paddle.zeros([1,1,1,xx.shape[1]])
        image =image.transpose([0,3,2,1])
        x = paddle.max(paddle.concat([xx,image],-2), -2)
        if state is None:
            state = [None] * len(self.decoder_layers)
        i = 0
        # 应用旋转位置嵌入
        x = rope(x.reshape([x.shape[0], x.shape[1], -1, self.sin_em.shape[1] * 2]).transpose([0, 2, 1, 3]),
                 self.sin_em,
                 self.cos_em, seq_len).transpose([0, 2, 1, 3]).reshape(x.shape) + x
        # 通过解码器层列表
        for decoder_layer in self.decoder_layers:
            x1, state[i] = decoder_layer(x, state[i])
            x = x1 + x
            i += 1
        # 通过输出层的线性层
        out = self.fc(self.layer_nor(x))
        return out, state

在 GPT 类的 forward 方法中,最后一个步骤是通过输出层的线性层将解码器层的输出映射到词汇表的大小。然后,该函数返回最终的输出和状态。
整个 GPT 模型通过这些自定义层和位置编码,以及旋转位置嵌入(RoPE)等技术,实现了对输入序列的编码和解码,从而能够生成或预测序列中的下一个单词。
需要注意的是,这段代码可能需要根据具体的 PaddlePaddle 版本和环境进行调整,以确保代码的正确性和兼容性。此外,由于代码较长,可能存在一些错误或者不完整的部分,因此在实际使用前需要仔细检查和调试。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部