दिलचस्प पोस्ट
एसक्यूएल शामिल है: एक-से-कई रिश्ते में अंतिम अभिलेख का चयन करना रिटर्निंग आईईइन्मेरेबल <T> बनाम IQueryable <T> मैं खिड़कियों के अनुप्रयोग में एक स्क्रीनशॉट कैसे ले सकता हूं? सी ++ ओवरलोड संकल्प रिकार्ड ध्वनि क्लिप के साथ एंड्रॉइड पर आवाज मान्यता? DOM ईवेंट प्रतिनिधिमंडल क्या है? आप Node.js में डाक डेटा कैसे निकाल सकते हैं? MVVM के साथ कई चयन प्रबंधित करना जावा कमांड लाइन तर्क ब्राउज़र जावास्क्रिप्ट स्टैक आकार की सीमा सी में एक +++++ बी काम क्यों नहीं करता? तारीखों को मान्य करने के लिए कैसे regex लिखना है? त्रुटि को कैसे ठीक करें: NodeJs का उपयोग करते समय EADDRINUSE सुनें? स्विफ्ट में एक वैकल्पिक मूल्य क्या है? std :: chrono :: high_resolution_clock का संकल्प माप के अनुरूप नहीं है

स्पार्क आरडीडी और / या स्पार्क डेटाफ़्रेम में डेटा को फिर से स्थानांतरित / पिवट करना

मेरे पास निम्न प्रारूप में कुछ डेटा है (या तो RDD या स्पार्क डेटाफ़्रेम):

from pyspark.sql import SQLContext sqlContext = SQLContext(sc) rdd = sc.parallelize([('X01',41,'US',3), ('X01',41,'UK',1), ('X01',41,'CA',2), ('X02',72,'US',4), ('X02',72,'UK',6), ('X02',72,'CA',7), ('X02',72,'XX',8)]) # convert to a Spark DataFrame schema = StructType([StructField('ID', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Score', IntegerType(), True)]) df = sqlContext.createDataFrame(rdd, schema) 

मैं क्या करना चाहूंगा, डेटा को 'reshape' करने के लिए, देश (विशेष रूप से अमेरिका, यूके और सीए) में कॉलम में कुछ पंक्तियां परिवर्तित करें:

 ID Age US UK CA 'X01' 41 3 1 2 'X02' 72 4 6 7 

मूलतः, मुझे पायथन के pivot कार्यप्रवाह की तर्ज पर कुछ की आवश्यकता है:

 categories = ['US', 'UK', 'CA'] new_df = df[df['Country'].isin(categories)].pivot(index = 'ID', columns = 'Country', values = 'Score') 

मेरा डेटासेट बड़ा नहीं है, इसलिए मैं वास्तव में collect() नहीं कर सकता और डेटा को मेमोरी में निगलना पायथन में खुद को फिर से करना पायथन की .pivot() को किसी आरडीडी या स्पार्क डेटाफ्रेम के मानचित्रण करते समय एक इनकॉक्लेबल फ़ंक्शन में परिवर्तित करने का कोई तरीका है? किसी भी सहायता की सराहना की जाएगी!

वेब के समाधान से एकत्रित समाधान "स्पार्क आरडीडी और / या स्पार्क डेटाफ़्रेम में डेटा को फिर से स्थानांतरित / पिवट करना"

स्पार्क 1.6 के बाद से आप GroupedData पर pivot फ़ंक्शन का उपयोग कर सकते हैं और समग्र अभिव्यक्ति प्रदान कर सकते हैं।

 pivoted = (df .groupBy("ID", "Age") .pivot( "Country", ['US', 'UK', 'CA']) # Optional list of levels .sum("Score")) # alternatively you can use .agg(expr)) pivoted.show() ## +---+---+---+---+---+ ## | ID|Age| US| UK| CA| ## +---+---+---+---+---+ ## |X01| 41| 3| 1| 2| ## |X02| 72| 4| 6| 7| ## +---+---+---+---+---+ 

स्तरों को छोड़ा जा सकता है लेकिन यदि प्रदान किया जाता है तो प्रदर्शन को बढ़ावा देता है और आंतरिक फ़िल्टर के रूप में काम करता है।

यह विधि अभी भी अपेक्षाकृत धीमी है लेकिन मैन्युअल रूप से मैन्युअल पासिंग डेटा जेवीएम और पायथन के बीच मैन्युअल रूप से धड़कता है।

सबसे पहले, यह संभवतः एक अच्छा विचार नहीं है, क्योंकि आपको कोई अतिरिक्त जानकारी नहीं मिल रही है, लेकिन आप एक निश्चित स्कीमा के साथ खुद को बाध्य कर रहे हैं (यानी आपको यह जानने की आवश्यकता है कि आप कितने देशों की उम्मीद कर रहे हैं, और ज़ाहिर है, अतिरिक्त देश का मतलब है कोड में परिवर्तन)

यह कहने के बाद, यह एक एसक्यूएल समस्या है, जो नीचे दिखाया गया है। लेकिन अगर आप मानते हैं कि यह "सॉफ्टवेयर की तरह" भी नहीं है (गंभीरता से, मैंने सुना है !!), तो आप पहले समाधान का उल्लेख कर सकते हैं

समाधान 1:

 def reshape(t): out = [] out.append(t[0]) out.append(t[1]) for v in brc.value: if t[2] == v: out.append(t[3]) else: out.append(0) return (out[0],out[1]),(out[2],out[3],out[4],out[5]) def cntryFilter(t): if t[2] in brc.value: return t else: pass def addtup(t1,t2): j=() for k,v in enumerate(t1): j=j+(t1[k]+t2[k],) return j def seq(tIntrm,tNext): return addtup(tIntrm,tNext) def comb(tP,tF): return addtup(tP,tF) countries = ['CA', 'UK', 'US', 'XX'] brc = sc.broadcast(countries) reshaped = calls.filter(cntryFilter).map(reshape) pivot = reshaped.aggregateByKey((0,0,0,0),seq,comb,1) for i in pivot.collect(): print i 

अब, समाधान 2: बेशक बेहतर है कि एसक्यूएल इस के लिए सही उपकरण है

 callRow = calls.map(lambda t: Row(userid=t[0],age=int(t[1]),country=t[2],nbrCalls=t[3])) callsDF = ssc.createDataFrame(callRow) callsDF.printSchema() callsDF.registerTempTable("calls") res = ssc.sql("select userid,age,max(ca),max(uk),max(us),max(xx)\ from (select userid,age,\ case when country='CA' then nbrCalls else 0 end ca,\ case when country='UK' then nbrCalls else 0 end uk,\ case when country='US' then nbrCalls else 0 end us,\ case when country='XX' then nbrCalls else 0 end xx \ from calls) x \ group by userid,age") res.show() 

डेटा सेट अप:

 data=[('X01',41,'US',3),('X01',41,'UK',1),('X01',41,'CA',2),('X02',72,'US',4),('X02',72,'UK',6),('X02',72,'CA',7),('X02',72,'XX',8)] calls = sc.parallelize(data,1) countries = ['CA', 'UK', 'US', 'XX'] 

परिणाम:

1 समाधान से

 (('X02', 72), (7, 6, 4, 8)) (('X01', 41), (2, 1, 3, 0)) 

2 समाधान से:

 root |-- age: long (nullable = true) |-- country: string (nullable = true) |-- nbrCalls: long (nullable = true) |-- userid: string (nullable = true) userid age ca uk us xx X02 72 7 6 4 8 X01 41 2 1 3 0 

कृपया मुझे बताएं कि यह काम करता है या नहीं 🙂

बेस्ट आयन

यहां एक मूल स्पार्क दृष्टिकोण है जो स्तंभ नामों को मुश्किल नहीं करता है। यह aggregateByKey पर आधारित है, और प्रत्येक कुंजी के लिए दिखाई देने वाले कॉलमों को एकत्र करने के लिए एक शब्दकोश का उपयोग करता है। फिर हम अंतिम डेटाफ्रेम बनाने के लिए सभी स्तंभ नाम इकट्ठा करते हैं। [पूर्व संस्करण प्रत्येक रिकार्ड के लिए एक शब्दकोश का उत्सर्जन करने के बाद jsonRDD का उपयोग किया गया था, लेकिन यह अधिक कुशल है।] स्तंभों की एक विशिष्ट सूची पर सीमित या XX जैसे लोगों को छोड़कर एक आसान संशोधन होगा

प्रदर्शन काफी बड़ी तालिकाओं पर अच्छा लगता है मैं एक भिन्नता का उपयोग कर रहा हूं जो कि समय की संख्या की गणना करता है कि हर आईडी के लिए घटनाओं की एक चर संख्या प्रत्येक ईवेंट के लिए एक कॉलम उत्पन्न करती है। यह कोड मूल रूप से एक ही है, इसके अलावा यह एक संग्रह का उपयोग करता है। घटनाओं की गिनती के लिए seqFn में एक seqFn बजाय seqFn

 from pyspark.sql.types import * rdd = sc.parallelize([('X01',41,'US',3), ('X01',41,'UK',1), ('X01',41,'CA',2), ('X02',72,'US',4), ('X02',72,'UK',6), ('X02',72,'CA',7), ('X02',72,'XX',8)]) schema = StructType([StructField('ID', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Score', IntegerType(), True)]) df = sqlCtx.createDataFrame(rdd, schema) def seqPivot(u, v): if not u: u = {} u[v.Country] = v.Score return u def cmbPivot(u1, u2): u1.update(u2) return u1 pivot = ( df .rdd .keyBy(lambda row: row.ID) .aggregateByKey(None, seqPivot, cmbPivot) ) columns = ( pivot .values() .map(lambda u: set(u.keys())) .reduce(lambda s,t: s.union(t)) ) result = sqlCtx.createDataFrame( pivot .map(lambda (k, u): [k] + [u.get(c) for c in columns]), schema=StructType( [StructField('ID', StringType())] + [StructField(c, IntegerType()) for c in columns] ) ) result.show() 

पैदा करता है:

 ID CA UK US XX X02 7 6 4 8 X01 2 1 3 null 

तो पहले बंद, मुझे यह सुधार आपके आरडीडी (जो आपके वास्तविक आउटपुट से मेल खाता है) में करना था:

 rdd = sc.parallelize([('X01',41,'US',3), ('X01',41,'UK',1), ('X01',41,'CA',2), ('X02',72,'US',4), ('X02',72,'UK',6), ('X02',72,'CA',7), ('X02',72,'XX',8)]) 

एक बार जब मैंने यह सुधार किया, तो यह चाल थी:

 df.select($"ID", $"Age").groupBy($"ID").agg($"ID", first($"Age") as "Age") .join( df.select($"ID" as "usID", $"Country" as "C1",$"Score" as "US"), $"ID" === $"usID" and $"C1" === "US" ) .join( df.select($"ID" as "ukID", $"Country" as "C2",$"Score" as "UK"), $"ID" === $"ukID" and $"C2" === "UK" ) .join( df.select($"ID" as "caID", $"Country" as "C3",$"Score" as "CA"), $"ID" === $"caID" and $"C3" === "CA" ) .select($"ID",$"Age",$"US",$"UK",$"CA") 

आपके धुरी के रूप में नहीं के रूप में लगभग सुरुचिपूर्ण, सुनिश्चित करने के लिए

बस पेट्रोरस्री के बहुत ही उपयोगी उत्तर पर कुछ टिप्पणियां:

  • स्तंभ आयु गायब है, इसलिए बस यू ["आयु"] = v.Age फ़ंक्शन के लिए जोड़ें seqPivot
  • यह पता चला कि कॉलम के तत्वों पर दोनों छोरों ने एक अलग क्रम में तत्व दिए। स्तंभों के मूल्य सही थे, लेकिन उनमें से नाम नहीं थे। इस व्यवहार से बचने के लिए बस कॉलम सूची का आदेश दें

यहां थोड़ा संशोधित कोड है:

 from pyspark.sql.types import * rdd = sc.parallelize([('X01',41,'US',3), ('X01',41,'UK',1), ('X01',41,'CA',2), ('X02',72,'US',4), ('X02',72,'UK',6), ('X02',72,'CA',7), ('X02',72,'XX',8)]) schema = StructType([StructField('ID', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Score', IntegerType(), True)]) df = sqlCtx.createDataFrame(rdd, schema) # u is a dictionarie # v is a Row def seqPivot(u, v): if not u: u = {} u[v.Country] = v.Score # In the original posting the Age column was not specified u["Age"] = v.Age return u # u1 # u2 def cmbPivot(u1, u2): u1.update(u2) return u1 pivot = ( rdd .map(lambda row: Row(ID=row[0], Age=row[1], Country=row[2], Score=row[3])) .keyBy(lambda row: row.ID) .aggregateByKey(None, seqPivot, cmbPivot) ) columns = ( pivot .values() .map(lambda u: set(u.keys())) .reduce(lambda s,t: s.union(t)) ) columns_ord = sorted(columns) result = sqlCtx.createDataFrame( pivot .map(lambda (k, u): [k] + [u.get(c, None) for c in columns_ord]), schema=StructType( [StructField('ID', StringType())] + [StructField(c, IntegerType()) for c in columns_ord] ) ) print result.show() 

अंत में, आउटपुट होना चाहिए

 +---+---+---+---+---+----+ | ID|Age| CA| UK| US| XX| +---+---+---+---+---+----+ |X02| 72| 7| 6| 4| 8| |X01| 41| 2| 1| 3|null| +---+---+---+---+---+----+ 

पीआईवीओटी के लिए एक मूल्य के लिए एक बड़ा मामला बयान के बिना, यह मूल रूप से करने के लिए एक जीआईआरए है।

https://issues.apache.org/jira/browse/HIVE-3776

कृपया वोट करें कि जीरा को ऊपर उठाएं ताकि इसे जल्द ही लागू किया जा सके। एक बार जब यह हाइव एसक्यूएल में होता है, तो स्पार्क में आमतौर पर बहुत पीछे की कमी नहीं होती है और अंत में इसे स्पार्क में भी लागू किया जाएगा।