PostgreSQL
 sql >> Baza danych >  >> RDS >> PostgreSQL

Klucze podstawowe z Apache Spark

Scala :

Jeśli potrzebujesz tylko unikalnych numerów, możesz użyć zipWithUniqueId i odtworzyć DataFrame. Najpierw kilka importów i fikcyjnych danych:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Wyodrębnij schemat do dalszego wykorzystania:

val schema = df.schema

Dodaj pole identyfikatora:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Utwórz ramkę danych:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

To samo w Pythonie :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Jeśli wolisz numer kolejny, możesz zastąpić zipWithUniqueId z zipWithIndex ale jest trochę droższy.

Bezpośrednio z DataFrame API :

(uniwersalna Scala, Python, Java, R z prawie taką samą składnią)

Wcześniej przegapiłem monotonicallyIncreasingId funkcja, która powinna działać dobrze, o ile nie potrzebujesz kolejnych liczb:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

Chociaż przydatne monotonicallyIncreasingId jest niedeterministyczny. Nie tylko identyfikatory mogą się różnić od wykonania do wykonania, ale bez dodatkowych sztuczek nie można ich użyć do identyfikacji wierszy, gdy kolejne operacje zawierają filtry.

Uwaga :

Możliwe jest również użycie rowNumber funkcja okna:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Niestety:

WARN Window:Nie zdefiniowano partycji do obsługi okna! Przeniesienie wszystkich danych na pojedynczą partycję może spowodować poważne pogorszenie wydajności.

Więc jeśli nie masz naturalnego sposobu na partycjonowanie danych i zapewnienie ich wyjątkowości, nie jest to szczególnie przydatne w tej chwili.



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Czy mogę wycofać już zatwierdzoną transakcję? (utrata danych)

  2. Deklaratywny SQLAlchemy:definiowanie wyzwalaczy i indeksów (Postgres 9)

  3. Zapytanie SQL, aby uzyskać najnowszy wiersz dla każdego wystąpienia danego klucza

  4. Jak zmienić schemat wielu tabel PostgreSQL w jednej operacji?

  5. Czy w nazwach kolumn PostgreSQL jest rozróżniana wielkość liter?