无法使用PySpark插入SQL,但可以在SQL中使用

编程入门 行业动态 更新时间:2024-10-25 13:25:17
本文介绍了无法使用PySpark插入SQL,但可以在SQL中使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我使用以下代码在SQL中创建了一个表:

I have created a table below in SQL using the following:

CREATE TABLE [dbo].[Validation]( [RuleId] [int] IDENTITY(1,1) NOT NULL, [AppId] [varchar](255) NOT NULL, [Date] [date] NOT NULL, [RuleName] [varchar](255) NOT NULL, [Value] [nvarchar](4000) NOT NULL )

注意身份密钥(RuleId)

NOTE the identity key (RuleId)

在SQL中按如下所示将值插入表中时,它会起作用:

When inserting values into the table as below in SQL it works:

注意:如果表为空且递增,则不按原样插入主键会自动填充

Note: Not inserting the Primary Key as is will autofill if table is empty and increment

INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')

但是,在数据块上创建临时表并在下面的PySpark上运行以下查询时,执行相同的查询时,如下所示:

However when creating a temp table on databricks and executing the same query below running this query on PySpark as below:

%python driver = <Driver> url = "jdbc:sqlserver:<URL>" database = "<db>" table = "dbo.Validation" user = "<user>" password = "<pass>" #import the data remote_table = spark.read.format("jdbc")\ .option("driver", driver)\ .option("url", url)\ .option("database", database)\ .option("dbtable", table)\ .option("user", user)\ .option("password", password)\ .load() remote_table.createOrReplaceTempView("YOUR_TEMP_VIEW_NAMES") sqlcontext.sql("INSERT INTO YOUR_TEMP_VIEW_NAMES VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")

我收到以下错误:

AnalysisException:'未知要求插入的数据与目标表具有相同的列数:目标表具有5列,但是插入的数据具有4列,包括0个分区列)具有恒定值.'

AnalysisException: 'unknown requires that the data to be inserted have the same number of columns as the target table: target table has 5 column(s) but the inserted data has 4 column(s), including 0 partition column(s) having constant value(s).;'

为什么它在SQL上有效,但在通过数据块传递查询时却不起作用?如何通过pyspark插入而不会出现此错误?

Why does it work on SQL but not when passing the query through databricks? How can I insert through pyspark without getting this error?

推荐答案

这里最直接的解决方案是使用Scala单元中的JDBC.EG

The most straightforward solution here is use JDBC from a Scala cell. EG

%scala import java.util.Properties import java.sql.DriverManager val jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser") val jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword") val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver" // Create the JDBC URL without passing in the user and password parameters. val jdbcUrl = s"jdbc:sqlserver://xxxx.database.windows:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows;loginTimeout=30;" // Create a Properties() object to hold the parameters. val connectionProperties = new Properties() connectionProperties.put("user", s"${jdbcUsername}") connectionProperties.put("password", s"${jdbcPassword}") connectionProperties.setProperty("Driver", driverClass) val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword) val stmt = connection.createStatement() val sql = "INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')" stmt.execute(sql) connection.close()

您也可以使用pyodbc,但是默认情况下未安装SQL Server ODBC驱动程序,而JDBC驱动程序已安装.

You could use pyodbc too, but the SQL Server ODBC drivers aren't installed by default, and the JDBC drivers are.

Spark解决方案是在SQL Server中创建一个视图,然后针对该视图进行插入.例如

A Spark solution would be to create a view in SQL Server and insert against that. eg

create view Validation2 as select AppId,Date,RuleName,Value from Validation

然后

tableName = "Validation2" df = spark.read.jdbc(url=jdbcUrl, table=tableName, properties=connectionProperties) df.createOrReplaceTempView(tableName) sqlContext.sql("INSERT INTO Validation2 VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")

如果要封装Scala并从另一种语言(例如Python)调用它,则可以使用Scala 包装单元格.

If you want to encapsulate the Scala and call it from another language (like Python), you can use a scala package cell.

例如

%scala package example import java.util.Properties import java.sql.DriverManager object JDBCFacade { def runStatement(url : String, sql : String, userName : String, password: String): Unit = { val connection = DriverManager.getConnection(url, userName, password) val stmt = connection.createStatement() try { stmt.execute(sql) } finally { connection.close() } } }

然后您可以这样称呼它:

and then you can call it like this:

jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser") jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword") jdbcUrl = "jdbc:sqlserver://xxxx.database.windows:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows;loginTimeout=30;" sql = "select 1 a into #foo from sys.objects" sc._jvm.example.JDBCFacade.runStatement(jdbcUrl,sql, jdbcUsername, jdbcPassword)

更多推荐

无法使用PySpark插入SQL,但可以在SQL中使用

本文发布于:2023-11-09 22:53:05,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1573649.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:PySpark   SQL

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!