PySpark Pandas_Udf()

Pyspark Pandas Udf



Det är möjligt att transformera PySpark DataFrame med funktionen pandas_udf(). Det är en användardefinierad funktion som appliceras på PySpark DataFrame med pil. Vi kan utföra de vektoriserade operationerna med hjälp av pandas_udf(). Det kan implementeras genom att passera denna funktion som dekoratör. Låt oss dyka in i den här guiden för att känna till syntaxen, parametrarna och olika exempel.

Ämne för innehåll:

Om du vill veta mer om PySpark DataFrame och modulinstallation, gå igenom detta artikel .







Pyspark.sql.functions.pandas_udf()

pandas_udf () är tillgänglig i modulen sql.functions i PySpark som kan importeras med nyckelordet 'från'. Den används för att utföra vektoriserade operationer på vår PySpark DataFrame. Denna funktion implementeras som en dekoratör genom att skicka tre parametrar. Efter det kan vi skapa en användardefinierad funktion som returnerar data i vektorformatet (som vi använder serier/NumPy för detta) med hjälp av en pil. Inom denna funktion kan vi returnera resultatet.



Struktur och syntax:



Låt oss först titta på strukturen och syntaxen för denna funktion:

@pandas_udf(datatyp)
def function_name(operation) -> convert_format:
returuppgift

Här är function_name namnet på vår definierade funktion. Datatypen anger datatypen som returneras av denna funktion. Vi kan returnera resultatet med nyckelordet 'retur'. Alla operationer utförs i funktionen med piltilldelningen.





Pandas_udf (Function och ReturnType)

  1. Den första parametern är den användardefinierade funktionen som skickas till den.
  2. Den andra parametern används för att specificera returdatatypen från funktionen.

Data:

I hela den här guiden använder vi endast en PySpark DataFrame för demonstration. Alla användardefinierade funktioner som vi definierar tillämpas på denna PySpark DataFrame. Se till att du skapar denna DataFrame i din miljö först efter installationen av PySpark.



importera pyspark

från pyspark.sql importera SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tips' ).getOrCreate()

från pyspark.sql.functions importera pandas_udf

från pyspark.sql.types import *

importera pandor som panda

# grönsaksdetaljer

grönsak =[{ 'typ' : 'grönsak' , 'namn' : 'tomat' , 'lokalisera_land' : 'USA' , 'kvantitet' : 800 },

{ 'typ' : 'frukt' , 'namn' : 'banan' , 'lokalisera_land' : 'KINA' , 'kvantitet' : tjugo },

{ 'typ' : 'grönsak' , 'namn' : 'tomat' , 'lokalisera_land' : 'USA' , 'kvantitet' : 800 },

{ 'typ' : 'grönsak' , 'namn' : 'Mango' , 'lokalisera_land' : 'JAPAN' , 'kvantitet' : 0 },

{ 'typ' : 'frukt' , 'namn' : 'citron' , 'lokalisera_land' : 'INDIEN' , 'kvantitet' : 1700 },

{ 'typ' : 'grönsak' , 'namn' : 'tomat' , 'lokalisera_land' : 'USA' , 'kvantitet' : 1200 },

{ 'typ' : 'grönsak' , 'namn' : 'Mango' , 'lokalisera_land' : 'JAPAN' , 'kvantitet' : 0 },

{ 'typ' : 'frukt' , 'namn' : 'citron' , 'lokalisera_land' : 'INDIEN' , 'kvantitet' : 0 }

]

# skapa marknadsdataramen från ovanstående data

market_df = linuxhint_spark_app.createDataFrame(grönsak)

market_df.show()

Produktion:

Här skapar vi denna DataFrame med 4 kolumner och 8 rader. Nu använder vi pandas_udf() för att skapa de användardefinierade funktionerna och tillämpa dem på dessa kolumner.

Pandas_udf() med olika datatyper

I det här scenariot skapar vi några användardefinierade funktioner med pandas_udf() och tillämpar dem på kolumner och visar resultaten med metoden select(). I varje fall använder vi pandas.Series när vi utför vektoriserade operationer. Detta betraktar kolumnvärdena som en endimensionell array och operationen tillämpas på kolumnen. I själva dekoratören anger vi funktionen returtyp.

Exempel 1: Pandas_udf() med strängtyp

Här skapar vi två användardefinierade funktioner med strängreturtypen för att konvertera kolumnvärdena för strängtyp till versaler och gemener. Slutligen tillämpar vi dessa funktioner på kolumnerna 'typ' och 'lokalisera_land'.

# Konvertera typkolumn till versaler med pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

returnera i.str.upper()

# Konvertera kolumnen locate_country till gemener med pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

returnera i.str.lower()

# Visa kolumnerna med select()

market_df.select( 'typ' ,typ_versaler( 'typ' ), 'lokalisera_land' ,
land_små_case( 'lokalisera_land' )).show()

Produktion:

Förklaring:

Funktionen StringType() är tillgänglig i modulen pyspark.sql.types. Vi importerade redan den här modulen när vi skapade PySpark DataFrame.

  1. Först returnerar UDF (användardefinierad funktion) strängarna med versaler med funktionen str.upper(). Str.upper() är tillgänglig i seriedatastrukturen (eftersom vi konverterar till serier med en pil inuti funktionen) som konverterar den givna strängen till versaler. Slutligen tillämpas den här funktionen på kolumnen 'type' som anges i metoden select(). Tidigare var alla strängar i typkolumnen med gemener. Nu har de ändrats till versaler.
  2. För det andra returnerar UDF strängarna med versaler med funktionen str.lower() . Str.lower() är tillgänglig i seriedatastrukturen som konverterar den givna strängen till gemener. Slutligen tillämpas den här funktionen på kolumnen 'type' som anges i metoden select(). Tidigare var alla strängar i typkolumnen med versaler. Nu har de ändrats till gemener.

Exempel 2: Pandas_udf() med heltalstyp

Låt oss skapa en UDF som konverterar heltalskolumnen PySpark DataFrame till Pandas-serien och lägger till 100 till varje värde. Skicka 'quantity'-kolumnen till den här funktionen i select()-metoden.

# Lägg till 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

returnera i+ 100

# Skicka kvantitetskolumnen till ovanstående funktion och display.

market_df.select( 'kvantitet' ,lägg till_100( 'kvantitet' )).show()

Produktion:

Förklaring:

Inuti UDF itererar vi alla värden och konverterar dem till serier. Efter det lägger vi till 100 till varje värde i serien. Slutligen skickar vi kolumnen 'kvantitet' till denna funktion och vi kan se att 100 läggs till alla värden.

Pandas_udf() med olika datatyper som använder Groupby() & Agg()

Låt oss titta på exemplen för att skicka UDF till de aggregerade kolumnerna. Här grupperas kolumnvärdena först med funktionen groupby() och aggregering görs med funktionen agg(). Vi skickar vår UDF inuti denna aggregatfunktion.

Syntax:

pyspark_dataframe_object.groupby( 'grouping_column' ).agg(UDF
(pyspark_dataframe_object[ 'kolumn' ]))

Här grupperas värdena i grupperingskolumnen först. Sedan görs aggregeringen på varje grupperad data med avseende på vår UDF.

Exempel 1: Pandas_udf() med Aggregate Mean()

Här skapar vi en användardefinierad funktion med en returtyp flytande. Inuti funktionen beräknar vi medelvärdet med hjälp av funktionen mean() . Denna UDF skickas till kolumnen 'kvantitet' för att få den genomsnittliga kvantiteten för varje typ.

# returnera medelvärdet/genomsnittet

@pandas_udf( 'flyta' )

def average_function(i: panda.Series) -> flyta:

returnera i.mean()

# Skicka kvantitetskolumnen till funktionen genom att gruppera typkolumnen.

market_df.groupby( 'typ' ).agg(average_function(market_df[ 'kvantitet' ])).show()

Produktion:

Vi grupperar baserat på element i kolumnen 'typ'. Två grupper bildas - 'frukt' och 'grönsaker'. För varje grupp beräknas medelvärdet och returneras.

Exempel 2: Pandas_udf() med Aggregate Max() och Min()

Här skapar vi två användardefinierade funktioner med returtypen heltal (int). Den första UDF:en returnerar minimivärdet och den andra UDF:en returnerar det högsta värdet.

# pandas_udf som returnerar det lägsta värdet

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

returnera i.min()

# pandas_udf som returnerar det maximala värdet

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

returnera i.max()

# Skicka mängdkolumnen till min_ pandas_udf genom att gruppera locate_country.

market_df.groupby( 'lokalisera_land' ).agg(min_(marknad_df[ 'kvantitet' ])).show()

# Skicka kvantitetskolumnen till max_ pandas_udf genom att gruppera locate_country.

market_df.groupby( 'lokalisera_land' ).agg(max_(market_df[ 'kvantitet' ])).show()

Produktion:

För att returnera lägsta och maximala värden använder vi funktionerna min() och max() i returtypen för UDF. Nu grupperar vi data i kolumnen 'locate_country'. Fyra grupper bildas ('KINA', 'INDIEN', 'JAPAN', 'USA'). För varje grupp returnerar vi maxkvantiteten. På samma sätt returnerar vi minimikvantiteten.

Slutsats

I grund och botten används pandas_udf () för att utföra vektoriserade operationer på vår PySpark DataFrame. Vi har sett hur man skapar pandas_udf() och tillämpar den på PySpark DataFrame. För bättre förståelse diskuterade vi de olika exemplen genom att överväga alla datatyper (sträng, flytande och heltal). Det kan vara möjligt att använda pandas_udf() med groupby() genom funktionen agg().