दिलचस्प पोस्ट
जावा: दोनों एक ही पैकेज में हैं, तो मैं मौजूदा एक स्टैंडअलोन आवेदन कैसे शुरू करूं? डबल घुंघराले ब्रेसिज़ के साथ AngularJS-Twig संघर्ष आप जावास्क्रिप्ट के साथ सी.एस.एस. नियम मानों को कैसे पढ़ते हैं? एंड्रॉइड के साथ WCF सेवा कैसे खपत करें स्ट्रिंग के रूप में अपने कच्चे एसक्यूएल क्वेरी को आउटपुट करने के लिए मैं क्वेरी बिल्डर कैसे प्राप्त करूं? एंड्रॉइड चुटकी ज़ूम 'जेड' शब्दशः के साथ सरलीकृत प्रारूप पार्सिंग की तारीख बिटमैप आकार Vm बजट त्रुटि Android से अधिक है SSL में उपयोग करने के लिए जावा कीस्टोर में मौजूदा x509 प्रमाणपत्र और निजी कुंजी को कैसे आयात करें? कई इकाई फ़्रेमवर्क में कई को डालें / अपडेट करें मैं यह कैसे करुं? UITableView पंक्ति नंबर को कैसे जानिए क्रॉस-ऑल संसाधन साझाकरण (सीओआरएस) पोस्ट अनुरोध काम करने के लिए कैसे करें Ggplot2 द्वारा उत्पन्न भूखंड के नीचे पाठ प्रदर्शित करना लंबे समय से चल रहे कार्य के परिणामों के साथ बार-बार अपडेट करें जेएलैबल आप यह सुनिश्चित कैसे करते हैं कि आप जिस प्रोग्राम को ईमेल भेजते हैं वह स्पैम के रूप में स्वचालित रूप से चिह्नित नहीं है?

स्पर्क डेटाफ़्रेम में प्रति समूह अधिकतम पंक्ति खोजें

मैं RDDs के बजाय स्पार्क डेटाफ्रेम का उपयोग करने की कोशिश कर रहा हूं क्योंकि वे RDD से अधिक उच्च स्तर के होते हैं और अधिक पठनीय कोड तैयार करते हैं, लेकिन मैं काम के लिए और अधिक मुहावरे के लिए सुझाव प्राप्त करने में बहुत खुश हूं।

14-नोड्स Google Dataproc क्लस्टर में, मेरे पास 6 लाख नाम हैं जो आईडी को दो अलग-अलग सिस्टमों द्वारा अनुवादित किए गए हैं: sa और sb प्रत्येक Row में name , id_sa और id_sb शामिल हैं मेरा लक्ष्य id_sa से id_sa से एक मैपिंग का निर्माण करना है, जैसे कि प्रत्येक id_sa , id_sb से जुड़ी सभी नामों के साथ संबंधित id_sb सबसे अक्सर आईडी है

एक उदाहरण के साथ स्पष्ट करने की कोशिश करते हैं। अगर मेरे पास निम्न पंक्तियाँ हैं:

 [Row(name='n1', id_sa='a1', id_sb='b1'), Row(name='n2', id_sa='a1', id_sb='b2'), Row(name='n3', id_sa='a1', id_sb='b2'), Row(name='n4', id_sa='a2', id_sb='b2')] 

मेरा लक्ष्य a1 से b2 तक मानचित्रण का निर्माण करना है I दरअसल, a1 n1 से जुड़े नाम n2 n1 , n2 और n3 , जो क्रमशः b1 , b2 और b2 b2 लिए नक्शे हैं, इसलिए b2 में a1 जुड़े नामों में सबसे लगातार मानचित्रण है। उसी तरह, a2 b2 को b2 मैप किया जाएगा। यह मानना ​​ठीक है कि हमेशा एक विजेता होगा: संबंधों को तोड़ने की कोई आवश्यकता नहीं है।

मुझे उम्मीद थी कि मैं अपने groupBy(df.id_sa) पर groupBy(df.id_sa) उपयोग कर सकता हूं, लेकिन मुझे नहीं पता कि आगे क्या करना है मैं एक ऐसी एकत्रीकरण की आशा कर रहा था जो अंत में, निम्नलिखित पंक्तियों का उत्पादन कर सकता है:

 [Row(id_sa=a1, max_id_sb=b2), Row(id_sa=a2, max_id_sb=b2)] 

लेकिन शायद मैं गलत उपकरण का उपयोग करने की कोशिश कर रहा हूं और मुझे आरडीडी का इस्तेमाल करने के लिए वापस जाना चाहिए।

वेब के समाधान से एकत्रित समाधान "स्पर्क डेटाफ़्रेम में प्रति समूह अधिकतम पंक्ति खोजें"

join का उपयोग करना (यह संबंधों के मामले में समूह में एक से अधिक पंक्ति का परिणाम देगा):

 import pyspark.sql.functions as F from pyspark.sql.functions import count, col cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts") maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs") cnts.join(maxs, (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa")) ).select(col("cnts.id_sa"), col("cnts.id_sb")) 

विंडो फ़ंक्शंस का उपयोग करना (संबंधों को छोड़ देगा):

 from pyspark.sql.functions import rowNumber from pyspark.sql.window import Window w = Window().partitionBy("id_sa").orderBy(col("cnt").desc()) (cnts .withColumn("rn", rowNumber().over(w)) .where(col("rn") == 1) .select("id_sa", "id_sb")) 

struct आदेश का उपयोग करना:

 from pyspark.sql.functions import struct (cnts .groupBy("id_sa") .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max")) .select(col("id_sa"), col("max.id_sb"))) 

स्प्रेड डेटाफ़्रेम भी देखें : प्रत्येक समूह की पहली पंक्ति चुनें

मुझे लगता है कि आप जो फ़ंक्शंस देख रहे हैं वह विंडो फ़ंक्शंस हैं: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

यहां स्काला में एक उदाहरण है (मेरे पास अभी तक उपलब्ध हाइव वाला स्पार्क शैल नहीं है, इसलिए मैं कोड का परीक्षण करने में सक्षम नहीं था, लेकिन मुझे लगता है कि यह काम करना चाहिए):

 case class MyRow(name: String, id_sa: String, id_sb: String) val myDF = sc.parallelize(Array( MyRow("n1", "a1", "b1"), MyRow("n2", "a1", "b2"), MyRow("n3", "a1", "b2"), MyRow("n1", "a2", "b2") )).toDF("name", "id_sa", "id_sb") import org.apache.spark.sql.expressions.Window val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc) myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb") 

विंडो फ़ंक्शन के साथ समान परिणाम प्राप्त करने के लिए संभवत: अधिक प्रभावी तरीके हैं, लेकिन मुझे आशा है कि यह आपको सही दिशा में बताता है।