1、下载spark源码并编译

mkdir -p /home/bigdata && cd /home/bigdata

wget https://archive.apache.org/dist/spark/spark-3.4.3/spark-3.4.3.tgz

解压文件

tar -zxf spark-3.4.3.tgz 

cd spark-3.4.3

wget https://raw.githubusercontent.com/apache/incubator-celeborn/v0.4.0-incubating/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch
git apply Celeborn_Dynamic_Allocation_spark3_4.patch

源码构建编译

./dev/make-distribution.sh --name lukeyan --pip --tgz -Dhadoop.version=3.3.6 -Phive -Phive-thriftserver -Pkubernetes -Pvolcano
 

编译成功

构建完成的进行解压操作并添加相应的jar文件

解压编译的文件

tar -zxvf spark-3.4.3-bin-lukeyan.tgz 

cd spark-3.4.3-bin-lukeyan
 

添加jar文件

cd jars/
 

ls
wget  https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/3.4.3/spark-hadoop-cloud_2.12-3.4.3.jar
wget  https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-cloud-storage/3.3.6/hadoop-cloud-storage-3.3.6.jar
wget  https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar
wget https://maven.aliyun.com/repository/public/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar
# 添加 Paimon集成相关依赖
wget  https://repo1.maven.org/maven2/org/apache/paimon/paimon-spark-3.4/0.9.0/paimon-spark-3.4-0.9.0.jar
# 如果Kubernetes 的发行版使用的是 K3s 、RKE2等,还需要加入以下依赖
wget  https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-jdk18on/1.77/bcpkix-jdk18on-1.77.jar
wget  https://repo1.maven.org/maven2/org/bouncycastle/bcprov-jdk18on/1.77/bcprov-jdk18on-1.77.jar
cd ..
 

构建docker镜像

docker buildx build --load --platform linux/arm64 --tag spark-paimon-s3:3.4.3_2.12 .

查看镜像架构

docker inspect --format '{{.Architecture}}' azul/zulu-openjdk:17.0.9-17.46.19-jre

docker images
docker save -o jdk.tar azul/zulu-openjdk:17.0.9-17.46.19-jre
docker save -o flink.tar flink:1.19-scala_2.12-java17
docker pull --platform linux/arm64 azul/zulu-openjdk:17.0.9-17.46.19-jre
docker inspect --format '{{.Architecture}}' azul/zulu-openjdk:17.0.9-17.46.19-jre
docker buildx ls
 

x86上构建Arm镜像参考地址Centos7的x86上构建arm镜像docker_centos7 arm镜像-CSDN博客

将Dockerfile拷贝到当前目录下

FROM azul/zulu-openjdk:17.0.9-17.46.19-jre
ARG spark_uid=185

ENV HADOOP_CONF_DIR=/etc/hadoop/conf


# Before building the docker image, first build and make a Spark distribution following
# the instructions in https://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .

RUN set -ex && \
    apt-get update && \
    ln -s /lib /lib64 && \
    apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps net-tools && \
    mkdir -p /opt/spark && \
    mkdir -p /opt/spark/examples && \
    mkdir -p /opt/spark/work-dir && \
    touch /opt/spark/RELEASE && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh && \
    echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
    rm -rf /var/cache/apt/* && rm -rf /var/lib/apt/lists/*

COPY jars /opt/spark/jars
# Copy RELEASE file if exists
COPY RELEAS[E] /opt/spark/RELEASE
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data


ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh

ENTRYPOINT [ "/opt/entrypoint.sh" ]

# Specify the User that the actual main process will run as
USER ${spark_uid}

执行构建镜像的命令

docker buildx build --load --platform linux/arm64 --tag spark-paimon-s3:3.4.3_2.12 .

得到基础镜像spark-paimon-s3:3.4.3_2.12

参考地址ApachePaimon 实践系列1-环境准备 (qq.com)
 

2、编写程序 

KafkaSparkPaimonS3

使用spark读取消费kafka,将固定格式的数据保存到S3协议的对象存储上,

这里s3使用了Minio

程序代码

package com.example.cloud;

import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object KafkaSparkPaimonS3 {
  def main(args: Array[String]): Unit = {
    val kafkaConsumer: String = "kafka-service:9092"
    val kafkaTopic: String = "mysql-flink-cdc-kafka"
    val startingOffsets: String = "latest"
    val kafkaGroupId: String = "KafkaSparkPaimonS3Group"
    val failOnDataLoss: Boolean = false
    val maxOffsetsPerTrigger: Int = 3000
    val lakePath: String = "s3a://paimon/warehouse"
    val checkpointLocation: String = "s3a://spark/checkpoints"
    val s3endpoint: String = "http://minio:9000"
    val s3access: String = "uotAvnxXwcz90yNxWhq2"
    val s3secret: String = "MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v"
    val schema_base = StructType(List(
      StructField("before", StringType),
      StructField("after", StringType),
      StructField("source", MapType(StringType, StringType)),
      StructField("op", StringType),
      StructField("ts_ms", LongType),
      StructField("transaction", StringType)
    ))
    println("create spark session ..........................................................")
    val sparkConf = SparkSession.builder()
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("sspark.sql.catalog.paimon.metastore", "filesystem")
      .config("spark.sql.catalog.paimon.warehouse", lakePath)
      .config("spark.sql.catalog.paimon.s3.endpoint", s3endpoint)
      .config("spark.sql.catalog.paimon.s3.access-key", s3access)
      .config("spark.sql.catalog.paimon.s3.secret-key", s3secret)
      .config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog")
      .config("spark.sql.catalog.paimon.s3.path-style.access", "true")
      .config("spark.sql.extensions", "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
      .config("spark.sql.catalog.paimon.s3.path-style.access", "true")
      .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
      .config("spark.hadoop.fs.s3a.multipart.size", "104857600")
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.access.key", s3access)
      .config("spark.hadoop.fs.s3a.secret.key", s3secret)
      .config("spark.hadoop.fs.s3a.endpoint", s3endpoint)
      .config("spark.hadoop.fs.s3a.connection.timeout", "200000")
    val sparkSession: SparkSession = sparkConf.getOrCreate()
    println("get spark DataStreamReader start  ..........................................................")
    val dsr: DataStreamReader = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaConsumer)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", startingOffsets)
      .option("failOnDataLoss", failOnDataLoss)
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .option("kafka.group.id", kafkaGroupId)
      .option("includeHeaders", "true")
    println("get spark DataStreamReader end  ..........................................................")
    val df: DataFrame = dsr.load()
    println("配置kafka消费流 spark DataFrame end  ..........................................................")
    import org.apache.spark.sql.functions._
    import sparkSession.implicits._
    val frame: Dataset[Row] = df.select(from_json('value.cast("string"), schema_base) as "value").select($"value.*")
      .alias("data")
      .select(
        get_json_object($"data.after", "$.uuid").as("uuid"),
        get_json_object($"data.after", "$.product").as("product"),
        get_json_object($"data.after", "$.promotion").as("promotion"),
        get_json_object($"data.after", "$.value_added_service").as("value_added_service"),
        get_json_object($"data.after", "$.logistics").as("logistics"),
        get_json_object($"data.after", "$.weight").as("weight"),
        get_json_object($"data.after", "$.color").as("color"),
        get_json_object($"data.after", "$.version").as("version"),
        get_json_object($"data.after", "$.shop").as("shop"),
        get_json_object($"data.after", "$.evaluate").as("evaluate"),
        get_json_object($"data.after", "$.order_num").as("order_num"),
        get_json_object($"data.after", "$.rider").as("rider"),
        get_json_object($"data.after", "$.order_time").as("order_time"),
        get_json_object($"data.after", "$.create_time").as("create_time"),
        get_json_object($"data.after", "$.pay_price").as("pay_price"),
        get_json_object($"data.after", "$.pay_type").as("pay_type"),
        get_json_object($"data.after", "$.address").as("address")
      )
    println("get spark Dataset from kafka  ..........................................................")
    sparkSession.sql("USE paimon;")
    println("spark engine use paimon catalog ..........................................................")
    sparkSession.sql("create database m31094;")
    println("create my favourite database for u ..........................................................")
    val tablePath = "paimon.m31094.my_table"
    println("create table to store data  ..........................................................")
    sparkSession.sql("use m31094;")
    sparkSession.sql(
      s"""
          CREATE TABLE IF NOT EXISTS $tablePath (
              uuid STRING,
              product STRING,
              promotion STRING,
              value_added_service STRING,
              logistics STRING,
              weight STRING,
              color STRING,
              version STRING,
              shop STRING,
              evaluate STRING,
              order_num STRING,
              rider STRING,
              order_time STRING,
              create_time STRING,
              pay_price STRING,
              pay_type STRING,
              address STRING
          ) TBLPROPERTIES (
                'partitioned_by' = 'uuid'
            )
      """)
    println("将 DataFrame 写入 Paimon 表  ..........................................................")

    println("尽可能的详细打印数据吧哈哈哈哈 ..........................................................")

    val query: StreamingQuery = frame //是一个已经创建的 Dataset[Row],通常是从流数据源(如 Kafka、文件等)获得的数据。
      .writeStream //开始一个流式写入操作。
      .foreachBatch { (batchDF: Dataset[Row], batchId: Long) =>
        println(s"处理批量流的UID是 batch ID: $batchId")
        // 打印当前批次的数据
        println("莫醒醒..........................................................")
        batchDF.show(truncate = false) // 设置 truncate = false 以完整显示列内容
      }
      .format("paimon")
      //指定数据输出格式为 Paimon。
      .option("write.merge-schema", "true")
      //允许在写入时合并模式(schema),即动态更新表的模式以适应新数据。
      .option("write.merge-schema.explicit-cast", "true")
      //在合并模式时,明确转换数据类型,以确保兼容性和正确性。
      .outputMode("append")
      //指定输出模式为追加模式,表示只将新的数据行添加到目标表中,不会更新或删除已有的数据。
      .option("checkpointLocation", checkpointLocation)
      //设置检查点位置,这对于流处理非常重要,有助于在故障恢复时重新启动流处理任务。
      .start("s3a://paimon/warehouse/m31094.db/my_table") //启动流式查询并将数据写入指定的 S3 路径
    println("spark流通过paimon方式写入数据湖 ..........................................................")
    println("查看数据内容和结构  ..........................................................")
    println(df.schema) // 打印 Schema
    println("打印 Schema  ..........................................................")
    println("Stream processing started...")
    query.awaitTermination() //使当前线程等待,直到流查询结束。这意味着程序会持续运行,直到手动停止或出现错误。
    println("流处理已结束,程序终止。")
  }
}
 

 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example.cloud</groupId>
    <artifactId>KafkaSparkPaimonS3</artifactId>
    <version>2.4.5</version>
    <name>KafkaSparkPaimonS3</name>
    <properties>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.4.1</spark.version>
        <paimon.version>0.9.0</paimon.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-bundle</artifactId>
            <version>1.12.367</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-text</artifactId>
            <version>1.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-spark-common</artifactId>
            <version>${paimon.version}</version>
        </dependency>
       <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-s3</artifactId>
            <version>${paimon.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-spark-3.4</artifactId>
            <version>${paimon.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-s3-impl</artifactId>
            <version>${paimon.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-text</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>audience-annotations</artifactId>
                    <groupId>org.apache.yetus</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-token-provider-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.20</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.example.cloud.KafkaSparkPaimonS3</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <excludeTransitive>false</excludeTransitive>
                            <stripVersion>false</stripVersion>
                            <includeScope>runtime</includeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-resources</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                            <outputDirectory>
                                ${project.build.directory}/config
                            </outputDirectory>
                            <resources>
                                <resource>
                                    <directory>src/main/resources/</directory>
                                </resource>
                            </resources>
                        </configuration>
                    </execution>
                    <execution>
                        <id>copy-sh</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                            <outputDirectory>
                                ${project.build.directory}
                            </outputDirectory>
                            <resources>
                                <resource>
                                    <directory>bin/</directory>
                                </resource>
                            </resources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
 

Dockerfile

FROM spark-paimon-s3:3.4.3_2.12
RUN mkdir -p /opt/spark/examples/jars
COPY target /opt/spark/examples/jars  

构建镜像的命令

docker buildx build --load --platform linux/arm64 --tag  spark-paimon-s3-app:3.4.3_2.12 --no-cache .
docker save -o spark-paimon-s3-app.tar spark-paimon-s3-app:3.4.3_2.12 

3、配置minio

minio.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: minio
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: minio
  template:
    metadata:
      labels:
        app: minio
    spec:
      containers:
        - name: minio
          image: minio/minio:latest
          imagePullPolicy: IfNotPresent
          args:
            - server
            - /data
          env:
            - name: MINIO_ROOT_USER
              value: "admin"
            - name: MINIO_ROOT_PASSWORD
              value: "密码"
          command:
            - /bin/sh
            - -c
            - minio server /data --console-address ":5000"
          ports:
            - name: api
              protocol: TCP
              containerPort: 9000
            - name: ui
              protocol: TCP
              containerPort: 5000
          volumeMounts:
            - name: minio-storage
              mountPath: /data
      volumes:
        - name: minio-storage
          persistentVolumeClaim:
            claimName: minio-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: minio
  namespace: default
spec:
  selector:
    app: minio
  type: NodePort
  ports:
    - name: api
      protocol: TCP
      port: 9000
      targetPort: 9000
    - name: ui
      protocol: TCP
      port: 5000
      targetPort: 5000

minio-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: minio-pvc  # PVC 的名称
  namespace: default
spec:
  accessModes:
    - ReadWriteMany  # 访问模式,此处为单节点读写
  resources:
    requests:
      storage: 100Gi  # 请求的存储容量大小
  storageClassName: nfs-client  # 存储类,根据需要选择 

4、运行程序

4.1、springboot -mysql产生原始数据

产生的MySQL原始数据

4.2 数据从MySQL到kafka

mysql->flink cdc->kafka

MysqlFlinkCdcToKafka

在k8s上提交flink任务

/home/d/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.namespace=default -Dkubernetes.cluster-id=flink-cdc-mysql -Dkubernetes.container.image.ref=flinkcdctokafka:0.1-snapshot -Dkubernetes.container.image.pull-policy=IfNotPresent -Dkubernetes.service-account=default -Dkubernetes.rest-service.exposed.type=NodePort -Djobmanager.memory.process.size=2048mb -Dtaskmanager.memory.process.size=2024mb -Dtaskmanager.numberOfTaskSlots=1 -Dhigh-availability.type=kubernetes -Dhigh-availability.storageDir=s3a://flink-cdc/recovery -Dstate.checkpoints.dir=s3a://flink-cdc/flink_cp -Dstate.savepoints.dir=s3a://flink-cdc/flink_sp -Dstate.backend.incremental=true -Ds3.access-key=uotAvnxXwcz90yNxWhq2 -Ds3.secret-key=MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v -Ds3.path.style.access=true -Ds3.endpoint=http://minio:9000 -Duser.timezone=Asia/Shanghai -c "com.example.cloud.MysqlFlinkCdcToKafka" local:///opt/flink/usrlib/MysqlFlinkCdcToKafka-jar-with-dependencies.jar

通过flink cdc将MySQL的数据写入到kafka的指定topic 

4.3 kafka到minio

kafka-spark-minio

spark提交命令,提交spark任务到k8s集群中运行

/opt/streaming/spark-3.4.3-bin-hadoop3/bin/spark-submit --name KafkaSparkPaimonS3 --master spark://10.10.10.99:7077 --deploy-mode client --driver-cores 2 --driver-memory 4g --num-executors 2 --executor-cores 2 --executor-memory 4g --class com.example.cloud.KafkaSparkPaimonS3 --conf spark.driver.extraClassPath=/opt/streaming/spark-3.4.3-bin-hadoop3/jars --conf spark.executor.extraClassPath=/opt/streaming/spark-3.4.3-bin-hadoop3/jars --jars /opt/lib/kafka-clients-3.8.0.jar,/opt/lib/spark-sql-kafka-0-10_2.13-3.4.3.jar,/opt/lib/spark-token-provider-kafka-0-10_2.13-3.4.3.jar /opt/KafkaSparkPaimonS3-jar-with-dependencies.jar

本地spark运行,可以通过spark sql查询数据的情况

本地执行spark-sql

/opt/streaming/spark-3.4.3-bin-hadoop3/bin/spark-sql --jars /opt/lib/paimon-spark-3.4-0.9.0.jar --conf 'spark.sql.catalog.paimon.metastore=filesystem' --conf 'spark.sql.catalog.paimon.warehouse=s3a://paimon/warehouse' --conf 'spark.sql.catalog.paimon.s3.endpoint=http://10.10.10.99:31212' --conf 'spark.sql.catalog.paimon.s3.access-key=uotAvnxXwcz90yNxWhq2' --conf 'spark.sql.catalog.paimon.s3.secret-key=MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog' --conf 'spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions' --conf 'spark.sql.catalog.paimon.s3.path-style.access=true' --conf 'spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore' --conf 'spark.hadoop.fs.s3a.multipart.size=104857600' --conf 'spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem' --conf 'spark.hadoop.fs.s3a.access.key=uotAvnxXwcz90yNxWhq2' --conf 'spark.hadoop.fs.s3a.secret.key=MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v' --conf 'spark.hadoop.fs.s3a.endpoint=http://10.10.10.99:31212' --conf 'spark.hadoop.fs.s3a.connectiopaimonn.timeout=200000'

 执行上面的本地spark-sql,开启spark终端后

use paimon;

use databases;

5、运行效果

 6、minio上存储

flink数据同步

 k8s上部署的容器服务

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部