import math
from glob import glob

import numpy as np
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
import pandas as pd
from tqdm import tqdm

from sklearn.metrics import f1_score,confusion_matrix


class MaxState(paddle.nn.Layer):
    def __init__(self, hidden_dim, heads, win):
        super(MaxState, self).__init__()

        assert hidden_dim % heads == 0, "Hidden size must be divisible by the number of heads."

        self.head_size = hidden_dim // heads
        self.head = paddle.nn.Linear(hidden_dim, hidden_dim, bias_attr=False)
        self.head_num = heads
        self.win = win
        self.hidden = hidden_dim
        self.mask = paddle.triu(paddle.ones([win, win]))

    def forward(self, input_data, state=None):
        b, s, k, h, w = input_data.shape[0], input_data.shape[1], self.head_num, self.head_size, self.win

        window = paddle.ones([1, w])

        out = self.head(input_data)

        out = out.unsqueeze(-1) @ window

        out = out.transpose([0, 2, 1, 3])

        one_list = []
        if state is None:
            state = paddle.ones([out.shape[0], out.shape[1], 1, 1]) * float("-inf")
        for i in range(0, s, w):
            j = w + i
            one = out[:, :, i:j]
            _, _, r, c = one.shape
            if r != self.win:

                one = paddle.where(self.mask[:r, :], one, paddle.to_tensor(-float('inf')))
            else:
                one = paddle.where(self.mask, one, paddle.to_tensor(-float('inf')))

            one = paddle.concat([one, state @ window], axis=2)
            state = paddle.max(one, axis=2, keepdim=True)
            one = state.reshape([b, k, h, w])
            state = state[..., -1:]
            if r != self.win:
                one = one[..., :r]

            one = one.transpose([0, 3, 1, 2])
            one_list.append(one)
        out = paddle.concat(one_list, 1)
        out = out.reshape([b, s, -1])

        return out, state




class FeedForward(nn.Layer):
    def __init__(self, hidden_size):
        super(FeedForward, self).__init__()

        self.ffn1 = nn.Linear(hidden_size, hidden_size * 2)
        self.ffn2 = nn.Linear(hidden_size * 2, hidden_size)
        self.gate = nn.Linear(hidden_size, hidden_size * 2)
        self.relu = nn.Silu()

    def forward(self, x):
        x1 = self.ffn1(x)
        x2 = self.relu(self.gate(x))
        x = x1 * x2
        x = self.ffn2(x)
        return x


class RMSNorm(nn.Layer):
    def __init__(self, dim, eps: float = 1e-6):
        super(RMSNorm, self).__init__()
        self.eps = eps
        self.fc = paddle.create_parameter(shape=[dim], dtype='float32',
                                          default_initializer=nn.initializer.Constant(value=1.0))

    def norm(self, x):
        return x * paddle.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

    def forward(self, x):
        output = self.norm(x)

        return output * self.fc


class GPTDecoderLayer(nn.Layer):
    def __init__(self, hidden_size, num_heads):
        super(GPTDecoderLayer, self).__init__()
        # self.self_attention = MaskMultiHeadAttention(hidden_size, num_heads)
        self.self_attention = MaxState(hidden_size, num_heads, 8)
        self.ffn = FeedForward(hidden_size)
        self.norm = nn.LayerNorm(hidden_size)
        self.norm1 = RMSNorm(hidden_size)

    def forward(self, x, state=None, seq_len=None):
        x1, state = self.self_attention(x, state)  # Self-Attention with residual connection
        x = x1 + x
        x = self.norm(x)

        x = self.ffn(x) + x  # Feed-Forward with residual connection
        x = self.norm1(x)
        return x, state


class PositionalEncoding(nn.Layer):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        # Create a long enough Paddle array to hold position encodings for the maximum sequence length
        position = paddle.arange(max_len).unsqueeze(1).astype("float32")
        # Create a constant 'pe' matrix with the same size as the embedding matrix
        div_term = paddle.exp(paddle.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = paddle.zeros([max_len, d_model])
        pe[:, 0::2] = paddle.sin(position * div_term)
        pe[:, 1::2] = paddle.cos(position * div_term)
        self.pe = pe.unsqueeze(0)  # Shape: [1, max_len, d_model]
        # Register 'pe' as a buffer (non-trainable parameter)

    def forward(self, x, seq_len=None):
        # x is of shape [batch_size, seq_len, d_model]

        if seq_len is None:
            seq_len = x.shape[1]
            return x + self.pe[:, :seq_len, :]
        else:
            return x + self.pe[:, seq_len - 1:seq_len, :]


# %%

def sinusoidal_position_embedding(max_len, output_dim):
    # (max_len, 1)
    position = paddle.arange(0, max_len, dtype="float32").unsqueeze(-1)
    # (output_dim//2)
    ids = paddle.arange(0, output_dim // 2, dtype="float32")  # 即公式里的i, i的范围是 [0,d/2]
    theta = 10000 ** (-2 * ids / output_dim)
    # (max_len, output_dim//2)
    embeddings = position * theta  # 即公式里的:pos / (10000^(2i/d))
    sin_embeddings = paddle.sin(embeddings)
    cos_embeddings = paddle.cos(embeddings)
    return sin_embeddings, cos_embeddings


def rope(q, sin_em, cos_em, seq_len=None):
    if seq_len is None:

        sin_em = sin_em[:q.shape[2]]
        cos_em = cos_em[:q.shape[2]]

    else:
        sin_em = sin_em[seq_len - 1:seq_len]
        cos_em = cos_em[seq_len - 1:seq_len]

    q1 = q.reshape([q.shape[0], q.shape[1], q.shape[2], -1, 2])[..., 1]
    q2 = q.reshape([q.shape[0], q.shape[1], q.shape[2], -1, 2])[..., 0]
    # 奇数负值*sin_em+偶数正值*cos_em  奇数正值*cos_em+偶数正值*sin_em

    q3 = paddle.stack([-q1 * sin_em + q2 * cos_em, q1 * cos_em + q2 * sin_em], -1)
    q = q3.reshape(q.shape)  # reshape后就是正负交替了
    return q


class CvEm(nn.Layer):
    def __init__(self, hidden_size):
        super(CvEm, self).__init__()
        self.embedding = nn.Conv1D(3, hidden_size, 3, padding=2)

    def forward(self, x):
        x = self.embedding(x)
        return x.transpose([0, 2, 1])


class GPT(nn.Layer):
    def __init__(self, vocab_size, hidden_size, num_heads, num_layers):
        super(GPT, self).__init__()
        self.embedding = CvEm(hidden_size)

        self.decoder_layers = nn.LayerList([GPTDecoderLayer(hidden_size, num_heads) for _ in range(num_layers)])
        self.fc = nn.Linear(hidden_size, vocab_size, bias_attr=False)
        self.sin_em, self.cos_em = sinusoidal_position_embedding(50000, hidden_size // num_heads // 2)

        self.layer_nor = paddle.nn.LayerNorm(hidden_size)

    def forward(self, x, state=None, seq_len=None):
        x = self.embedding(x)
        # x = self.position_embedding(x, seq_len)

        if state is None:
            state = [None] * len(self.decoder_layers)

        i = 0
        x = rope(x.reshape([x.shape[0], x.shape[1], -1, self.sin_em.shape[1] * 2]).transpose([0, 2, 1, 3]),
                 self.sin_em,
                 self.cos_em, seq_len).transpose([0, 2, 1, 3]).reshape(x.shape) + x
        for decoder_layer in self.decoder_layers:
            x1, state[i] = decoder_layer(x, state[i])
            x = x1 + x
            i += 1

        out = self.fc(self.layer_nor(paddle.max(x, 1)))
        return out, state

这段代码实现了一个基于PaddlePaddle的GPT(Generative Pre-trained Transformer)模型。主要包括以下几个部分:

  1. 引入依赖库:引入了一些需要使用的库,包括math、glob、numpy、paddle等。

  2. 定义MaxState类:这是一个自定义的PaddlePaddle层,用于计算输入数据的最大状态。它使用了自注意力机制(self-attention)和位置编码(positional encoding)来计算输入数据的最大状态。

  3. 定义FeedForward类:这是一个前馈神经网络层,用于对输入数据进行非线性变换。

  4. 定义RMSNorm类:这是一个归一化层,用于对输入数据进行归一化处理。

  5. 定义GPTDecoderLayer类:这是一个GPT解码器层,包括自注意力机制、前馈神经网络和归一化层。

  6. 定义PositionalEncoding类:这是一个位置编码层,用于为输入数据添加位置信息。

  7. 定义sinusoidal_position_embedding函数:这是一个用于生成正弦位置编码和余弦位置编码的函数。

  8. 定义rope函数:这是一个用于将输入数据与位置编码相结合的函数。

  9. 定义CvEm类:这是一个卷积神经网络层,用于将输入数据进行卷积操作。

  10. 定义GPT类:这是一个GPT模型的定义,包括嵌入层、解码器层和全连接层。

  11. forward函数:这是GPT模型的前向传播函数,用于计算输出结果。

总体来说,这段代码实现了一个基于PaddlePaddle的GPT模型,并提供了相应的层和函数用于构建和训练模型。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部