programing

Apache Spark DataFrame에서 열 연결

telecom 2023. 10. 17. 20:01
반응형

Apache Spark DataFrame에서 열 연결

Apache Spark DataFrame에서 두 열을 연결하려면 어떻게 해야 합니까?Spark SQL에 우리가 사용할 수 있는 기능이 있습니까?

원시 SQL을 사용하면 사용할 수 있습니다.CONCAT:

  • 파이썬에서

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
  • 스칼라에서

    import sqlContext.implicits._
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    

스파크 1.5.0 이후 사용 가능concatDataFrame API로 기능:

  • Python에서:

    from pyspark.sql.functions import concat, col, lit
    
    df.select(concat(col("k"), lit(" "), col("v")))
    
  • 스칼라에서:

    import org.apache.spark.sql.functions.{concat, lit}
    
    df.select(concat($"k", lit(" "), $"v"))
    

또한 있습니다.concat_ws문자열 구분 기호를 첫 번째 인수로 사용하는 함수.

사용자 지정 이름 지정 방법은 다음과 같습니다.

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

주는.

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

연결하여 새 열을 만듭니다.

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+

Spark Scala에서 문자열 열을 연결하는 하나의 옵션은 다음과 같습니다.concat.

null 값을 확인해야 합니다.열 중 하나가 null이면 다른 열 중 하나에 정보가 있어도 결과가 null이 되기 때문입니다.

사용.concat그리고.withColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

사용.concat그리고.select:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

두 접근법 모두에서 NEW_COLUMN 값을 갖게 되며, 이 값은 원래 df에서 COL1과 COL2 열의 연결 값이 됩니다.

콘캣 (*콜스)

v1.5 이상

여러 입력 열을 하나의 열로 연결합니다.함수는 문자열, 이진 및 호환 배열 열과 함께 작동합니다.

예:new_df = df.select(concat(df.a, df.b, df.c))


concat_ws(sep, *cols)

v1.5 이상

와 유사합니다.concat지정된 구분 기호를 사용합니다.

예:new_df = df.select(concat_ws('-', df.col1, df.col2))


map_concat (*cols)

v2.4 이상

맵을 조정하는 데 사용되며, 모든 주어진 맵의 조합을 반환합니다.

예:new_df = df.select(map_concat("map1", "map2"))


concat 연산자 사용(||):

v2.3 이상

예:df = spark.sql("select col_a || col_b || col_c as abc from table_x")

참조 : Spark sql doc

DF를 사용하여 수행하려면 udf를 사용하여 기존 열을 기준으로 새 열을 추가할 수 있습니다.

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()

Spark 2.3(SPARK-22771)부터 Spark SQL은 연결 연산자를 지원합니다.||.

예를 들어,

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")

다음은 pyspark를 위한 또 다른 방법입니다.

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit

#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])

#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))

#Show the new data frame
personDF.show()

----------RESULT-------------------------

84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+

다음은 데이터 프레임에 있는 열의 개수나 이름을 모를 때를 위한 제안입니다.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))

아래 프로세스에 해당하는 자바 구문이 있습니까?

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))

Spark 2.3.0에서는 다음 작업을 수행할 수 있습니다.

spark.sql( """ select '1' || column_a from table_a """)

Java에서는 여러 열을 연결하는 작업을 수행할 수 있습니다.샘플 코드는 시나리오와 이해를 돕기 위한 사용 방법을 제공하기 위한 것입니다.

SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
                        .withColumn("concatenatedCol",
                                concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));


class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;

    public static SparkSession getInstance(SparkConf sparkConf) {
        if (instance == null) {
            instance = SparkSession.builder().config(sparkConf)
                    .getOrCreate();
        }
        return instance;
    }
}

위의 코드는 col1,col2,col3을 "_"로 구분하여 "contatatedCol"이라는 이름을 가진 열을 만듭니다.

저 같은 경우에는 파이프로 구분된 행을 원했거든요.

from pyspark.sql import functions as F
df.select(F.concat_ws('|','_c1','_c2','_c3','_c4')).show()

이것은 버터 위의 뜨거운 칼처럼 잘 작동했습니다.

다음과 같은 Concat 방법을 사용합니다.

Dataset<Row> DF2 = DF1
            .withColumn("NEW_COLUMN",concat(col("ADDR1"),col("ADDR2"),col("ADDR3"))).as("NEW_COLUMN")

sqlContext를 사용하여 pySpark에서 수행하는 또 다른 방법은...

#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))

실제로 사용자 지정 기능을 구현할 필요 없이 연결을 수행할 수 있는 아름다운 기본 제공 추상화가 있습니다.당신이 Spark SQL을 언급했으므로 spark.sql()을 통해 선언적 명령으로 전달하려는 것으로 추측됩니다.그렇다면 다음과 같은 SQL 명령을 바로 전달할 수 있습니다.SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;

또한 Spark 2.3.0부터는 다음과 같은 라인에서 명령을 사용할 수 있습니다.SELECT col1 || col2 AS concat_column_name FROM <table_name>;

여기서 은 원하는 구분 기호(빈 공간일 수도 있음)이며 읽으려는 임시 또는 영구 테이블입니다.

간단하게 사용할 수 있습니다.SelectExpr뿐만 아니라.

df1.selectExpr("*","upper(_2||_3) as new")

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

참고: 이 코드가 작동하려면 "IsNotNull()" 함수에 괄호(")를 넣어야 합니다. -> 정확한 코드는 "IsNotNull()"입니다.

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull(), col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull(), col("COL2")).otherwise(lit("null"))))

언급URL : https://stackoverflow.com/questions/31450846/concatenate-columns-in-apache-spark-dataframe

반응형