Scala :
Jeśli potrzebujesz tylko unikalnych numerów, możesz użyć zipWithUniqueId
i odtworzyć DataFrame. Najpierw kilka importów i fikcyjnych danych:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Wyodrębnij schemat do dalszego wykorzystania:
val schema = df.schema
Dodaj pole identyfikatora:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Utwórz ramkę danych:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
To samo w Pythonie :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Jeśli wolisz numer kolejny, możesz zastąpić zipWithUniqueId
z zipWithIndex
ale jest trochę droższy.
Bezpośrednio z DataFrame
API :
(uniwersalna Scala, Python, Java, R z prawie taką samą składnią)
Wcześniej przegapiłem monotonicallyIncreasingId
funkcja, która powinna działać dobrze, o ile nie potrzebujesz kolejnych liczb:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
Chociaż przydatne monotonicallyIncreasingId
jest niedeterministyczny. Nie tylko identyfikatory mogą się różnić od wykonania do wykonania, ale bez dodatkowych sztuczek nie można ich użyć do identyfikacji wierszy, gdy kolejne operacje zawierają filtry.
Uwaga :
Możliwe jest również użycie rowNumber
funkcja okna:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Niestety:
WARN Window:Nie zdefiniowano partycji do obsługi okna! Przeniesienie wszystkich danych na pojedynczą partycję może spowodować poważne pogorszenie wydajności.
Więc jeśli nie masz naturalnego sposobu na partycjonowanie danych i zapewnienie ich wyjątkowości, nie jest to szczególnie przydatne w tej chwili.