दिलचस्प पोस्ट
पायथन: सूची बनाम डेक के लिए तालिका देखें सी में बूलियन मान का उपयोग करना PHP के साथ HTTP कैश हेडर का उपयोग कैसे करें टुकड़ों का उपयोग करते समय एंड्रॉइड नेविगेशन ड्रॉवर छवि और अप कैरेट के बीच स्विच करना अधिकतम आकार जो एक सरणी को पकड़ सकता है? एंड्रॉइड के लिए ग्रहण में एडमॉब त्रुटि: configChanges संकेतक को सूचक की आवश्यकता विलंब के बीच का अंतर ()। वादा और वादा वास्तव में एक पुनरावृत्ति समारोह क्या है? कोर डंप फ़ाइल विश्लेषण तरीकों की घोषणा PHP में जनक तरीकों के साथ संगत होना चाहिए एंड्रॉइड अद्यतन 17 बाह्य जार के साथ असंगत लगता है क्या मैं एक फ़ील्ड में एकाधिक MySQL पंक्तियों को जोड़ सकता हूँ? समुच्चय और पीओडी और कैसे / क्यों वे विशेष हैं? एंड्रॉइड पर कुछ डिवाइसेज़ पर कैमेरा इंटेन्ट का उपयोग करके एक छवि कैद क्यों हो जाती है?

मैं एक RDD को दो या अधिक RDD में कैसे विभाजित कर सकता हूं?

मैं एक आरडीडी को दो या दो से अधिक आरडीडी में विभाजित करने का एक तरीका तलाश रहा हूं। सबसे नज़दीकी मैंने देखा है स्काला स्पार्क: कई आरडीडी में विभाजित संग्रह? जो अभी भी एक आरडीडी है

यदि आप एसएएस से परिचित हैं, तो ऐसा कुछ:

data work.split1, work.split2; set work.preSplit; if (condition1) output work.split1 else if (condition2) output work.split2 run; 

जिसके परिणामस्वरूप दो अलग-अलग डेटा सेट आए। मुझे लगता है कि परिणाम प्राप्त करने के लिए तत्काल जारी रहना होगा …

वेब के समाधान से एकत्रित समाधान "मैं एक RDD को दो या अधिक RDD में कैसे विभाजित कर सकता हूं?"

एकल रूपांतरण से कई आरडीडी उत्पन्न करने के लिए संभव नहीं है * यदि आप RDD को विभाजित करना चाहते हैं, तो आपको प्रत्येक विभाजन की स्थिति के लिए एक filter लागू करना होगा। उदाहरण के लिए:

 def even(x): return x % 2 == 0 def odd(x): return not even(x) rdd = sc.parallelize(range(20)) rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even)) 

यदि आपके पास केवल एक द्विआधारी स्थिति है और गणना करना महंगा है तो आप ऐसा कुछ पसंद कर सकते हैं:

 kv_rdd = rdd.map(lambda x: (x, odd(x))) kv_rdd.cache() rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys() rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys() 

इसका अर्थ केवल एक एकल परिभाषा के लिए होता है, लेकिन सभी डेटा पर अतिरिक्त पास की आवश्यकता होती है।

यह ध्यान रखना महत्वपूर्ण है कि जब तक इनपुट आरडीडी ठीक से कैश्ड किया जाता है और डेटा वितरण के संबंध में कोई अतिरिक्त धारणा नहीं होती है, तब दोहराया फिल्टर और लूप के साथ-पाश के बीच समय की जटिलता के मामले में कोई महत्त्वपूर्ण अंतर नहीं है।

एन तत्वों और एम स्थितियों के साथ परिचालन की संख्या आपको निष्पादित करने के लिए एन गुणा एम के स्पष्ट रूप से आनुपातिक है। लूप के मामले में यह (एन + एमएन) / 2 के करीब होना चाहिए और दोहराया फ़िल्टर बिल्कुल एनएम है लेकिन अंत में दिन यह ओ (एनएम) के अलावा और कुछ नहीं है। आप जेसन लेंेंडर्म के साथ मेरी चर्चा ** देख सकते हैं, कुछ पेशेवरों और विपक्षों के बारे में पढ़ सकते हैं।

बहुत उच्च स्तर पर आपको दो चीजों पर विचार करना चाहिए:

  1. स्पार्क परिवर्तन आलसी हैं, जब तक आप कोई क्रिया निष्पादित नहीं करते हैं, आपका आरडीडी आरम्भ नहीं होता है

    इससे क्या फर्क पड़ता है? मेरे उदाहरण पर वापस जा रहे हैं:

     rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even)) 

    यदि बाद में मैं तय करता हूं कि मुझे केवल rdd_odd आवश्यकता है तो rdd_odd को अमल करने का कोई कारण नहीं है।

    यदि आप अपने एसएएस उदाहरण पर काम की गणना करने के लिए एक उदाहरण work.split2 तो आप को इनपुट डेटा और work.split1 दोनों को अमल में work.split1

  2. RDDs एक घोषणात्मक एपीआई प्रदान करते हैं। जब आप filter या map उपयोग करते हैं तो स्पार्क इंजन पर यह पूरी तरह से अपवाद है कि यह ऑपरेशन कैसे किया जाता है। जब तक परिवर्तनों को पारित किए जाने वाले कार्यों साइड इफेक्ट्स मुफ्त होते हैं, तब तक पूरी पाइपलाइन को अनुकूलित करने के लिए कई संभावनाएं पैदा होती हैं।

दिन के अंत में यह मामला अपने स्वयं के परिवर्तन को सही ठहराने के लिए पर्याप्त नहीं है।

फिल्टर पैटर्न के साथ यह नक्शा वास्तव में कोर स्पार्क में प्रयोग किया जाता है मेरा जवाब देखें कि कैसे स्पार्क्स आरडीडी.रैमैंडएसप्लिट ने वास्तव में randomSplit और randomSplit पद्धति पद्धति का एक प्रासंगिक हिस्सा विभाजित किया है ।

यदि एकमात्र लक्ष्य इनपुट पर एक विभाजन हासिल करना है, तो DataFrameWriter लिए partitionBy खंड का उपयोग करना संभव है, जो पाठ आउटपुट स्वरूप:

 def makePairs(row: T): (String, String) = ??? data .map(makePairs).toDF("key", "value") .write.partitionBy($"key").format("text").save(...) 

* स्पार्क में परिवर्तन के केवल 3 बुनियादी प्रकार हैं:

  • आरडीडी [टी] => आरडीडी [टी]
  • आरडीडी [टी] => आरडीडी [यू]
  • (आरडीडी [टी], आरडीडी [यू]) => आरडीडी [डब्ल्यू]

जहां टी, यू, डब्ल्यू या तो परमाणु प्रकार या उत्पादों / ट्यूपल्स (के, वी) हो सकते हैं। उपरोक्त के कुछ संयोजनों का उपयोग करते हुए किसी भी अन्य ऑपरेशन को व्यक्त करना है अधिक जानकारी के लिए आप मूल आरडीडी पेपर की जांच कर सकते हैं।

** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** स्काला स्पार्क भी देखें : कई आरडीडी में विभाजित संग्रह?

उपर्युक्त अन्य पोस्टर के रूप में, कोई एकल, देशी आरडीडी रूपांतरण जो आरडीडी विभाजन नहीं करता है, लेकिन यहां कुछ "मल्टीप्लेक्स" संचालन हैं जो कि कई बार बिना पढ़े RDDs पर "विभाजन" की एक विस्तृत विविधता का कुशल रूप से अनुकरण कर सकते हैं:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

यादृच्छिक बंटवारे के लिए कुछ विधियां विशिष्ट हैं:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

तरीकों ओपन सोर्स सिलेक्स परियोजना से उपलब्ध हैं:

https://github.com/willb/silex

एक ब्लॉग पोस्ट समझाते हुए कि वे कैसे काम करते हैं:

http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/

 def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U], persist: StorageLevel): Seq[RDD[U]] = { val mux = self.mapPartitionsWithIndex { case (id, itr) => Iterator.single(f(id, itr)) }.persist(persist) Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } } } def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]], persist: StorageLevel): Seq[RDD[U]] = { val mux = self.mapPartitionsWithIndex { case (id, itr) => Iterator.single(f(id, itr)) }.persist(persist) Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } } } 

जैसा कि कहीं और कहा गया है, इन तरीकों में गति के लिए स्मृति का एक ट्रेड-ऑफ शामिल होता है, क्योंकि वे संपूर्ण विभाजन परिणामों को "आज़ादी" के बजाय "बेसब्री से" कंप्यूटिंग द्वारा संचालित करते हैं। इसलिए, इन पद्धतियों के लिए बड़े विभाजनों पर स्मृति समस्याओं को चलाने के लिए संभव है, जहां अधिक पारंपरिक आलसी रूपांतरण नहीं होगा।

यदि आप यादृच्छिक एसप्लीट एपीआई कॉल का उपयोग कर आरडीडी को विभाजित करते हैं , तो आपको आरडीडी की एक सरणी वापस मिल जाती है।

यदि आप चाहते हैं कि 5 आरडीडी वापस आए, तो 5 वेट वैल्यू में पास करें।

जैसे

 val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4) val seedValue = 5 val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue) splitRDD(1).collect() res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100) 

एक ही तरीका है कि आपके फ़िल्टर की स्थिति के आधार पर डेटा को विभाजित करने के लिए एक कस्टम पार्टीशनर का उपयोग करना है यह Partitioner को विस्तारित करके और RangePartitioner Partitioner समान कुछ लागू करके हासिल किया जा सकता है।

एक नक्शा विभाजन का उपयोग तब तब किया जा सकता है जब सभी डेटा पढ़ने के बिना विभाजित RDD से कई RDDs का निर्माण किया जा सके।

 val filtered = partitioned.mapPartitions { iter => { new Iterator[Int](){ override def hasNext: Boolean = { if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) { false } else { iter.hasNext } } override def next():Int = iter.next() } 

बस पता है कि फ़िल्टर किए गए आरडीडी में विभाजन की संख्या विभाजित RDD की संख्या के समान होगी, ताकि इसे कम करने और खाली विभाजन को हटाने के लिए संगठित होना चाहिए।