दिलचस्प पोस्ट
जावा XPath (Apache JAXP कार्यान्वयन) प्रदर्शन आईटरेटिव डीएफएस बनाम रिकर्सिव डीएफएस और विभिन्न तत्वों के आदेश कंस्ट्रक्चर मान क्यों नहीं लौटते? डेटा। योग्य में एची Java में INI फ़ाइल को पार्स करने का सबसे आसान तरीका क्या है? java: फ़्लोट को फ्लोट में स्ट्रिंग और स्ट्रिंग में कनवर्ट करें \ r \ n, \ r, \ n उन दोनों के बीच अंतर क्या है? रूबी में कोई वेतन वृद्धि ऑपरेटर (++) नहीं है? जावा के साथ hdfs में एक फाइल लिखें गलत स्ट्रिंग मान: '\ xF0 \ x9F \ x8E \ xB6 \ xF0 \ x9F …' MySQL विंडोज़ के तहत PHP पहुंच नेटवर्क पथ JQuery के माध्यम से इसे सबमिट किए बिना एक html5 फॉर्म मान्यता को कैसे लागू करें अंगुल्य 2 और ऑबोबॉबल्स: 'एनजीएमोडेल' से बाध्य नहीं किया जा सकता क्योंकि यह 'चयन' की ज्ञात संपत्ति नहीं है एंड्रॉइड: एंड्रॉइड। आर.आईड.कॉन्टान्ट क्या है? समान नियमों के साथ कई निष्पादनयोग्य निर्माण

स्पर्क डेटाफ़्रेम में प्रति समूह अधिकतम पंक्ति खोजें

मैं RDDs के बजाय स्पार्क डेटाफ्रेम का उपयोग करने की कोशिश कर रहा हूं क्योंकि वे RDD से अधिक उच्च स्तर के होते हैं और अधिक पठनीय कोड तैयार करते हैं, लेकिन मैं काम के लिए और अधिक मुहावरे के लिए सुझाव प्राप्त करने में बहुत खुश हूं।

14-नोड्स Google Dataproc क्लस्टर में, मेरे पास 6 लाख नाम हैं जो आईडी को दो अलग-अलग सिस्टमों द्वारा अनुवादित किए गए हैं: sa और sb प्रत्येक Row में name , id_sa और id_sb शामिल हैं मेरा लक्ष्य id_sa से id_sa से एक मैपिंग का निर्माण करना है, जैसे कि प्रत्येक id_sa , id_sb से जुड़ी सभी नामों के साथ संबंधित id_sb सबसे अक्सर आईडी है

एक उदाहरण के साथ स्पष्ट करने की कोशिश करते हैं। अगर मेरे पास निम्न पंक्तियाँ हैं:

 [Row(name='n1', id_sa='a1', id_sb='b1'), Row(name='n2', id_sa='a1', id_sb='b2'), Row(name='n3', id_sa='a1', id_sb='b2'), Row(name='n4', id_sa='a2', id_sb='b2')] 

मेरा लक्ष्य a1 से b2 तक मानचित्रण का निर्माण करना है I दरअसल, a1 n1 से जुड़े नाम n2 n1 , n2 और n3 , जो क्रमशः b1 , b2 और b2 b2 लिए नक्शे हैं, इसलिए b2 में a1 जुड़े नामों में सबसे लगातार मानचित्रण है। उसी तरह, a2 b2 को b2 मैप किया जाएगा। यह मानना ​​ठीक है कि हमेशा एक विजेता होगा: संबंधों को तोड़ने की कोई आवश्यकता नहीं है।

मुझे उम्मीद थी कि मैं अपने groupBy(df.id_sa) पर groupBy(df.id_sa) उपयोग कर सकता हूं, लेकिन मुझे नहीं पता कि आगे क्या करना है मैं एक ऐसी एकत्रीकरण की आशा कर रहा था जो अंत में, निम्नलिखित पंक्तियों का उत्पादन कर सकता है:

 [Row(id_sa=a1, max_id_sb=b2), Row(id_sa=a2, max_id_sb=b2)] 

लेकिन शायद मैं गलत उपकरण का उपयोग करने की कोशिश कर रहा हूं और मुझे आरडीडी का इस्तेमाल करने के लिए वापस जाना चाहिए।

वेब के समाधान से एकत्रित समाधान "स्पर्क डेटाफ़्रेम में प्रति समूह अधिकतम पंक्ति खोजें"

join का उपयोग करना (यह संबंधों के मामले में समूह में एक से अधिक पंक्ति का परिणाम देगा):

 import pyspark.sql.functions as F from pyspark.sql.functions import count, col cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts") maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs") cnts.join(maxs, (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa")) ).select(col("cnts.id_sa"), col("cnts.id_sb")) 

विंडो फ़ंक्शंस का उपयोग करना (संबंधों को छोड़ देगा):

 from pyspark.sql.functions import rowNumber from pyspark.sql.window import Window w = Window().partitionBy("id_sa").orderBy(col("cnt").desc()) (cnts .withColumn("rn", rowNumber().over(w)) .where(col("rn") == 1) .select("id_sa", "id_sb")) 

struct आदेश का उपयोग करना:

 from pyspark.sql.functions import struct (cnts .groupBy("id_sa") .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max")) .select(col("id_sa"), col("max.id_sb"))) 

स्प्रेड डेटाफ़्रेम भी देखें : प्रत्येक समूह की पहली पंक्ति चुनें

मुझे लगता है कि आप जो फ़ंक्शंस देख रहे हैं वह विंडो फ़ंक्शंस हैं: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

यहां स्काला में एक उदाहरण है (मेरे पास अभी तक उपलब्ध हाइव वाला स्पार्क शैल नहीं है, इसलिए मैं कोड का परीक्षण करने में सक्षम नहीं था, लेकिन मुझे लगता है कि यह काम करना चाहिए):

 case class MyRow(name: String, id_sa: String, id_sb: String) val myDF = sc.parallelize(Array( MyRow("n1", "a1", "b1"), MyRow("n2", "a1", "b2"), MyRow("n3", "a1", "b2"), MyRow("n1", "a2", "b2") )).toDF("name", "id_sa", "id_sb") import org.apache.spark.sql.expressions.Window val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc) myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb") 

विंडो फ़ंक्शन के साथ समान परिणाम प्राप्त करने के लिए संभवत: अधिक प्रभावी तरीके हैं, लेकिन मुझे आशा है कि यह आपको सही दिशा में बताता है।