दिलचस्प पोस्ट
मैं खाली जावास्क्रिप्ट ऑब्जेक्ट के लिए कैसे परीक्षण करूं? invalid_grant Google से oAuth टोकन प्राप्त करने का प्रयास कर रहा है जावा का समर्थन अहस्ताक्षरित ints क्यों नहीं करता है? डिस्पैचर का उपयोग कर एक गैर-मुख्य थ्रेड से WPF नियंत्रण बदलें कुशल सूची प्रतिच्छेदन एल्गोरिथम MySQL: लोड डेटा स्थानीय INFILE सक्षम करें क्यों नॉन-डिटरनिस्टिकवादी ऑर्डर करने वाला डिक्शनरी है? सजाए गए कार्यों के हस्ताक्षर को संरक्षित करना LoadFile और लोड के बीच अंतर। नेट विधानसभाओं के साथ? बहुआयामी सरणी के लिए एक आयामी पहुंच: अच्छी तरह से परिभाषित सी? सूची दृश्य के माध्यम से डायनामिक रूप से जेनरेट किए गए चेकबॉक्स को चेक करते समय कोई समस्या हो रही है अपरिभाषित फ़ंक्शन mysql_connect () मैवेन में परीक्षण कोड साझा करना कैसे पता लगाया जाए कि क्या कोई स्क्रिप्ट स्रोत है? घातक त्रुटि एलएनके 1112: मॉड्यूल मशीन प्रकार 'x64' लक्ष्य मशीन प्रकार 'एक्स 86' के साथ संघर्ष

प्रत्येक समूह की पहली पंक्ति का चयन कैसे करें?

मेरे पास एक डेटाफ्रेम है, जिसका अनुसरण किया गया है:

df.groupBy($"Hour", $"Category") .agg(sum($"value") as "TotalValue") .sort($"Hour".asc, $"TotalValue".desc)) 

परिणाम दिखते हैं:

 +----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 0| cat26| 30.9| | 0| cat13| 22.1| | 0| cat95| 19.6| | 0| cat105| 1.3| | 1| cat67| 28.5| | 1| cat4| 26.8| | 1| cat13| 12.6| | 1| cat23| 5.3| | 2| cat56| 39.6| | 2| cat40| 29.7| | 2| cat187| 27.9| | 2| cat68| 9.8| | 3| cat8| 35.6| | ...| ....| ....| +----+--------+----------+ 

जैसा कि आप देख सकते हैं, डेटाफ्रेम का क्रम बढ़ते Hour में ऑर्डर किया जाता है, फिर कुलवॉल्यू द्वारा अवरोही क्रम में।

मैं प्रत्येक समूह की शीर्ष पंक्ति का चयन करना चाहूंगा, अर्थात्

  • घंटे के समूह से == 0 चुनें (0, बिल्ली 26, 30.9)
  • घंटे के समूह से == 1 का चयन करें (1, बिल्ली 67,28.5)
  • घंटे के समूह से == 2 का चयन करें (2, बिल्ली 56,39.6)
  • और इसी तरह

तो वांछित उत्पादन होगा:

 +----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 0| cat26| 30.9| | 1| cat67| 28.5| | 2| cat56| 39.6| | 3| cat8| 35.6| | ...| ...| ...| +----+--------+----------+ 

प्रत्येक समूह की शीर्ष एन पंक्तियों को भी चुनने में सक्षम हो सकता है

किसी भी सहायताको बहुत सराहा जाएगा।

वेब के समाधान से एकत्रित समाधान "प्रत्येक समूह की पहली पंक्ति का चयन कैसे करें?"

विंडो फ़ंक्शंस :

ऐसा कुछ ऐसा करना चाहिए:

 import org.apache.spark.sql.functions.{row_number, max, broadcast} import org.apache.spark.sql.expressions.Window val df = sc.parallelize(Seq( (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3), (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3), (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8), (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue") val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc) val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") dfTop.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+ 

महत्वपूर्ण डेटा तिरछा के मामले में यह विधि अक्षम हो जाएगी।

join बाद सादा एसक्यूएल एकत्रीकरण :

वैकल्पिक रूप से आप एकत्रित डेटा फ्रेम में शामिल हो सकते हैं:

 val dfMax = df.groupBy($"hour").agg(max($"TotalValue")) val dfTopByJoin = df.join(broadcast(dfMax), ($"hour" === $"max_hour") && ($"TotalValue" === $"max_value")) .drop("max_hour") .drop("max_value") dfTopByJoin.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+ 

यह डुप्लिकेट मान रखेगा (यदि एक से अधिक श्रेणी प्रति घंटे समान कुल मूल्य के साथ हैं)। आप निम्नानुसार इन्हें हटा सकते हैं:

 dfTopByJoin .groupBy($"hour") .agg( first("category").alias("category"), first("TotalValue").alias("TotalValue")) 

structs ऊपर आदेश का उपयोग करना :

साफ, हालांकि बहुत अच्छी तरह से परीक्षण नहीं किया गया, चाल जो कि जुड़ने या विंडो कार्यों की आवश्यकता नहीं है:

 val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue") dfTop.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+ 

डाटासेट एपीआई (स्पार्क 1.6+, 2.0+) के साथ:

स्पार्क 1.6 :

 case class Record(Hour: Integer, Category: String, TotalValue: Double) df.as[Record] .groupBy($"hour") .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) .show // +---+--------------+ // | _1| _2| // +---+--------------+ // |[0]|[0,cat26,30.9]| // |[1]|[1,cat67,28.5]| // |[2]|[2,cat56,39.6]| // |[3]| [3,cat8,35.6]| // +---+--------------+ 

स्पार्क 2.0 या बाद में :

 df.as[Record] .groupByKey(_.Hour) .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y) 

आखिरी दो विधियां नक्शा साइड का संयोजन कर सकती हैं और पूर्ण फेरबदल की आवश्यकता नहीं है, इसलिए अधिकांश समय विंडो फ़ंक्शन के मुकाबले बेहतर प्रदर्शन को प्रदर्शित करते हैं और जुड़ते हैं।

का प्रयोग न करें :

 df.orderBy(...).groupBy(...).agg(first(...), ...) 

ऐसा लग सकता है कि काम (विशेषकर local मोड में) पर अविश्वसनीय है ( SPARK-16207 )। संबंधित जिरा मुद्दे को जोड़ने के लिए त्शेख ज़्हहर को क्रेडिट।

उसी नोट पर लागू होता है

 df.orderBy(...).dropDuplicates(...) 

जो आंतरिक रूप से समान निष्पादन योजना का उपयोग करता है।

स्पार्क 2.0.2 के लिए कई कॉलमों के समूहिंग के साथ:

 import org.apache.spark.sql.functions.row_number import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"timestamp".desc) val refined_df = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") 

यदि डेटाफ़्रेम को कई कॉलमों द्वारा समूहीकृत किया जाना है, तो यह मदद कर सकता है

 val keys = List("Hour", "Category"); val selectFirstValueOfNoneGroupedColumns = df.columns .filterNot(keys.toSet) .map(_ -> "first").toMap val grouped = df.groupBy(keys.head, keys.tail: _*) .agg(selectFirstValueOfNoneGroupedColumns) 

उम्मीद है कि इस तरह की किसी की भी इसी तरह की समस्या है

चिंगारी के लिए> 2.0 हम बस कर सकते हैं:
groupBy($"Hour").agg(df_op.columns.map((_, "first")).toMap)

विस्तार से ओपी के सेटअप का प्रयोग करें:

 val df_op = df.groupBy($"Hour", $"Category") .agg(sum($"value") as "TotalValue") .sort($"Hour".asc, $"TotalValue".desc)) df_op.groupBy($"Hour").agg(df_op.columns.map((_, "first")).toMap) 

यह Compute aggregates by specifying a map from column name to aggregate methods. agg करने के लिए agg की agg विधि का उपयोग कर रहा है Compute aggregates by specifying a map from column name to aggregate methods.first एक एसक्यूएल एकत्रीकरण कार्य है

हम रैंक () विंडो फ़ंक्शन का उपयोग कर सकते हैं (जहां आप रैंक = 1 का चयन करेंगे) रैंक सिर्फ एक समूह की प्रत्येक पंक्ति के लिए संख्या जोड़ता है (इस स्थिति में यह समय होगा)

यहाँ एक उदाहरण है ( https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank से )

 val dataset = spark.range(9).withColumn("bucket", 'id % 3) import org.apache.spark.sql.expressions.Window val byBucket = Window.partitionBy('bucket).orderBy('id) scala> dataset.withColumn("rank", rank over byBucket).show +---+------+----+ | id|bucket|rank| +---+------+----+ | 0| 0| 1| | 3| 0| 2| | 6| 0| 3| | 1| 1| 1| | 4| 1| 2| | 7| 1| 3| | 2| 2| 1| | 5| 2| 2| | 8| 2| 3| +---+------+----+