Apache Spark DataFrame에서 열 연결
Apache Spark DataFrame에서 두 열을 연결하려면 어떻게 해야 합니까?Spark SQL에 우리가 사용할 수 있는 기능이 있습니까?
원시 SQL을 사용하면 사용할 수 있습니다.CONCAT
:
파이썬에서
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v")) df.registerTempTable("df") sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
스칼라에서
import sqlContext.implicits._ val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v") df.registerTempTable("df") sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
스파크 1.5.0 이후 사용 가능concat
DataFrame API로 기능:
Python에서:
from pyspark.sql.functions import concat, col, lit df.select(concat(col("k"), lit(" "), col("v")))
스칼라에서:
import org.apache.spark.sql.functions.{concat, lit} df.select(concat($"k", lit(" "), $"v"))
또한 있습니다.concat_ws
문자열 구분 기호를 첫 번째 인수로 사용하는 함수.
사용자 지정 이름 지정 방법은 다음과 같습니다.
import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()
주는.
+--------+--------+
|colname1|colname2|
+--------+--------+
| row11| row12|
| row21| row22|
+--------+--------+
연결하여 새 열을 만듭니다.
df = df.withColumn('joined_column',
sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()
+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
| row11| row12| row11_row12|
| row21| row22| row21_row22|
+--------+--------+-------------+
Spark Scala에서 문자열 열을 연결하는 하나의 옵션은 다음과 같습니다.concat
.
null 값을 확인해야 합니다.열 중 하나가 null이면 다른 열 중 하나에 정보가 있어도 결과가 null이 되기 때문입니다.
사용.concat
그리고.withColumn
:
val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))
사용.concat
그리고.select
:
val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")
두 접근법 모두에서 NEW_COLUMN 값을 갖게 되며, 이 값은 원래 df에서 COL1과 COL2 열의 연결 값이 됩니다.
콘캣 (*콜스)
v1.5 이상
여러 입력 열을 하나의 열로 연결합니다.함수는 문자열, 이진 및 호환 배열 열과 함께 작동합니다.
예:new_df = df.select(concat(df.a, df.b, df.c))
concat_ws(sep, *cols)
v1.5 이상
와 유사합니다.concat
지정된 구분 기호를 사용합니다.
예:new_df = df.select(concat_ws('-', df.col1, df.col2))
map_concat (*cols)
v2.4 이상
맵을 조정하는 데 사용되며, 모든 주어진 맵의 조합을 반환합니다.
예:new_df = df.select(map_concat("map1", "map2"))
concat 연산자 사용(||
):
v2.3 이상
예:df = spark.sql("select col_a || col_b || col_c as abc from table_x")
참조 : Spark sql doc
DF를 사용하여 수행하려면 udf를 사용하여 기존 열을 기준으로 새 열을 추가할 수 있습니다.
val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)
//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))
//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )
//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
Spark 2.3(SPARK-22771)부터 Spark SQL은 연결 연산자를 지원합니다.||
.
예를 들어,
val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")
다음은 pyspark를 위한 또 다른 방법입니다.
#import concat and lit functions from pyspark.sql.functions
from pyspark.sql.functions import concat, lit
#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])
#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))
#Show the new data frame
personDF.show()
----------RESULT-------------------------
84
+------------+
|East African|
+------------+
| Ethiopian|
| Kenyan|
| Ugandan|
| Rwandan|
+------------+
다음은 데이터 프레임에 있는 열의 개수나 이름을 모를 때를 위한 제안입니다.
val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
아래 프로세스에 해당하는 자바 구문이 있습니까?
val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
Spark 2.3.0에서는 다음 작업을 수행할 수 있습니다.
spark.sql( """ select '1' || column_a from table_a """)
Java에서는 여러 열을 연결하는 작업을 수행할 수 있습니다.샘플 코드는 시나리오와 이해를 돕기 위한 사용 방법을 제공하기 위한 것입니다.
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
.withColumn("concatenatedCol",
concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession.builder().config(sparkConf)
.getOrCreate();
}
return instance;
}
}
위의 코드는 col1,col2,col3을 "_"로 구분하여 "contatatedCol"이라는 이름을 가진 열을 만듭니다.
저 같은 경우에는 파이프로 구분된 행을 원했거든요.
from pyspark.sql import functions as F
df.select(F.concat_ws('|','_c1','_c2','_c3','_c4')).show()
이것은 버터 위의 뜨거운 칼처럼 잘 작동했습니다.
다음과 같은 Concat 방법을 사용합니다.
Dataset<Row> DF2 = DF1
.withColumn("NEW_COLUMN",concat(col("ADDR1"),col("ADDR2"),col("ADDR3"))).as("NEW_COLUMN")
sqlContext를 사용하여 pySpark에서 수행하는 또 다른 방법은...
#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])
# Now we can concatenate columns and assign the new column a name
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))
실제로 사용자 지정 기능을 구현할 필요 없이 연결을 수행할 수 있는 아름다운 기본 제공 추상화가 있습니다.당신이 Spark SQL을 언급했으므로 spark.sql()을 통해 선언적 명령으로 전달하려는 것으로 추측됩니다.그렇다면 다음과 같은 SQL 명령을 바로 전달할 수 있습니다.SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;
또한 Spark 2.3.0부터는 다음과 같은 라인에서 명령을 사용할 수 있습니다.SELECT col1 || col2 AS concat_column_name FROM <table_name>;
여기서 은 원하는 구분 기호(빈 공간일 수도 있음)이며 읽으려는 임시 또는 영구 테이블입니다.
간단하게 사용할 수 있습니다.SelectExpr
뿐만 아니라.
df1.selectExpr("*","upper(_2||_3) as new")
val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))
참고: 이 코드가 작동하려면 "IsNotNull()" 함수에 괄호(")를 넣어야 합니다. -> 정확한 코드는 "IsNotNull()"입니다.
val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull(), col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull(), col("COL2")).otherwise(lit("null"))))
언급URL : https://stackoverflow.com/questions/31450846/concatenate-columns-in-apache-spark-dataframe
'programing' 카테고리의 다른 글
window.location일 때 이벤트.href 변경사항 (0) | 2023.10.17 |
---|---|
C의 부호 없는 값 (0) | 2023.10.17 |
C의 잠정적인 정의의 근거는 무엇입니까? (0) | 2023.10.17 |
C에서 2d 배열을 자유롭게 하는 방법? (0) | 2023.10.17 |
NSFetchRequest 인스턴스에 유형을 적용하는 방법은 무엇입니까? (0) | 2023.10.17 |