दिलचस्प पोस्ट
PHP और mySQL: वर्ष 2038 बग: यह क्या है? इसे कैसे हल करें? Maven2 को निर्भरता को प्रतिलिपि / लक्ष्य में कॉपी करने के लिए बाध्य करें जावास्क्रिप्ट ऑब्जेक्ट को संदर्भ से नया वैरिएबल नहीं प्रतिलिपि कैसे करें? सार वर्ग बनाम इंटरफेस किसी प्रपत्र में इनपुट तत्व के अंदर आइकन डालें जावास्क्रिप्ट के माध्यम से ब्राउज़र में चयनित HTML प्राप्त करें जावा 'अंतिम' विधि: यह क्या वादा करता है? अब क्या विकल्प हैं कि Google वेब खोज एपीआई नापसंद कर दिया गया है? कोड के भीतर अजगर मॉड्यूल इंस्टॉल करना Android में "@ आईडी /" और "@ + id /" के बीच अंतर केंद्रीय विकल्प हाइबरनेट फ़ंक्शन में Excel VBA लोड वर्कशीट 'स्ट्रीम खोलने में असफल: अनुमति अस्वीकृत' त्रुटि – लैरवेल स्विंग के साथ कैनवास कैसे बनाया जाए? JQuery.ajax के बाद अनपेक्षित टोकन बृहदान्त्र JSON

डेटासेट में कस्टम ऑब्जेक्ट कैसे स्टोर करें?

परिचय स्पार्क डेटासेट के अनुसार:

जैसा कि हम स्पार्क 2.0 के लिए तत्पर हैं, हम विशेष रूप से डेटासेट के लिए कुछ रोमांचक सुधार की योजना बनाते हैं … कस्टम एन्कोडर्स – जब हम वर्तमान में विभिन्न प्रकार के एन्कोडर्स को स्वत: उत्पन्न करते हैं, तो हम कस्टम ऑब्जेक्ट के लिए एक एपीआई खोलना चाहते हैं।

और एक Dataset में कस्टम प्रकार को स्टोर करने का प्रयास निम्न त्रुटि की तरह होता है:

डेटासेट में संग्रहीत प्रकार के लिए एन्कोडर खोजने में असमर्थ आदिम प्रकार (इंट, स्ट्रिंग, इत्यादि) और उत्पाद प्रकार (केस क्लास) को sqlContext.implicits._ आयात के द्वारा समर्थित किया जाता है। अन्य प्रकारों को क्रमबद्ध करने के लिए समर्थन भविष्य के रिलीज में जोड़ा जाएगा

या:

Java.lang.UnsupportedOperationException: कोई एन्कोडर नहीं मिला ….

क्या कोई मौजूदा समाधान हैं?


ध्यान दें कि यह प्रश्न केवल एक समुदाय विकी उत्तर के लिए एक प्रविष्टि बिंदु के रूप में मौजूद है। प्रश्न और जवाब दोनों को अपडेट / सुधारने के लिए स्वतंत्र महसूस करें।

वेब के समाधान से एकत्रित समाधान "डेटासेट में कस्टम ऑब्जेक्ट कैसे स्टोर करें?"

अद्यतन करें

यह उत्तर अभी भी मान्य और सूचनात्मक है, हालाँकि चीजें अब 2.2 / 2.3 से बेहतर हैं, जो Set , Seq , Map , Date , Timestamp , और BigDecimal लिए अंतर्निर्मित एनकोडर समर्थन जोड़ती हैं। यदि आप केवल मामला वर्गों और सामान्य स्काला प्रकार के साथ प्रकार बनाने के लिए चिपकते हैं, तो आपको ठीक से SQLImplicits में निहित होना चाहिए


दुर्भाग्य से, इसके साथ मदद करने के लिए लगभग कुछ भी नहीं जोड़ा गया है। @since 2.0.0 या SQLImplicits.scala में @since 2.0.0 लिए खोजना ज्यादातर चीजों को आदिम प्रकारों (और केस वर्गों के कुछ tweaking) के साथ करने की तलाश करता है। इसलिए, पहली बात यह कहना है: वर्तमान में कस्टम क्लास एनकोडर्स के लिए कोई वास्तविक समर्थन नहीं है । इस तरह से, जो कुछ ऐसी चीजें हैं जो एक नौकरी के रूप में अच्छी तरह से करते हैं, जैसा कि हम कभी भी उम्मीद कर सकते हैं, जो कि हमारे वर्तमान में हमारे निपटान पर है। अपफ्रंट अस्वीकरण के रूप में: यह पूरी तरह से काम नहीं करेगा और मैं सभी सीमाओं को स्पष्ट और अग्रिम रूप से बनाने के लिए अपनी पूरी कोशिश करूँगा।

वास्तव में समस्या क्या है

जब आप एक डाटासेट बनाना चाहते हैं, तो स्पार्क "एक एन्कोडर की आवश्यकता होती है (आंतरिक स्पार्क एसक्यूएल प्रस्तुति से टाइप करने के लिए और टी टाइप की एक जेवीएम ऑब्जेक्ट को परिवर्तित करने के लिए) जो आम तौर पर SparkSession से अंतर्निहित रूप से निर्मित होता है, या स्थिर रूप से कॉल करके बनाया जा सकता है createDataset पर तरीकों "(बनाने के दस्तावेज़ पर डॉक्स से लिया गया) एक एन्कोडर फॉर्म Encoder[T] लेगा जहां T एन्कोडिंग है। पहला सुझाव import spark.implicits._ जोड़ना है। import spark.implicits._ (जो आपको ये अन्तर्निहित एनकोडर देता है) और दूसरा सुझाव एन्कोडर से संबंधित कार्यों के इस सेट का उपयोग करके स्पष्ट रूप से निहित एन्कोडर में उत्तीर्ण करना है।

नियमित कक्षाओं के लिए कोई एन्कोडर उपलब्ध नहीं है, इसलिए

 import spark.implicits._ class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

आप निम्न अंतर्निहित संबंधित संकलन समय त्रुटि दे देंगे:

डेटासेट में संग्रहीत प्रकार के लिए एन्कोडर खोजने में असमर्थ आदिम प्रकार (इंट, स्ट्रिंग, इत्यादि) और उत्पाद प्रकार (केस क्लास) को sqlContext.implicits._ आयात के द्वारा समर्थित किया जाता है। अन्य प्रकारों को क्रमबद्ध करने के लिए समर्थन भविष्य के रिलीज में जोड़ा जाएगा

हालांकि, यदि आप किसी भी प्रकार को लपेटते हैं, तो आप किसी Product फैलाने वाले कुछ वर्ग में उपरोक्त त्रुटि प्राप्त करने के लिए इस्तेमाल करते हैं, तो त्रुटि को गलत ढंग से रनटाइम में देरी हो जाती है, इसलिए

 import spark.implicits._ case class Wrap[T](unwrap: T) class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3)))) 

संकलित करता है ठीक है, लेकिन रनटाइम पर विफल रहता है

java.lang.UnsupportedOperationException: MyObj के लिए कोई एन्कोडर नहीं मिला

इसके लिए इसका कारण यह है कि एनकोडर्स स्पार्क फिक्ससिट्स के साथ बनाता है वास्तव में केवल रनटाइम (स्काला रिलेस्ट्रेमेंट के माध्यम से) पर बना है इस मामले में, समय के संकलन पर सभी स्पार्क चेक यह है कि बाहरी क्लास Product (जो सभी केस क्लास करते हैं) फैलता है, और केवल रनटाइम पर एहसास होता है कि यह अभी भी नहीं जानता है कि MyObj साथ क्या करना है (यह समस्या तब होती है जब मैंने कोशिश की एक Dataset[(Int,MyObj)] बनाने के लिए Dataset[(Int,MyObj)] – स्पार्क MyObj समय तक MyObj पर MyObj तक इंतजार करता है MyObj ये केंद्रीय समस्याएं हैं जिन्हें तय करने की ज़रुरत है:

  • कुछ कक्षाएं जो रनटाइम में हमेशा दुर्घटना के बावजूद Product संकलन का विस्तार करती हैं और
  • नेस्टेड प्रकारों के लिए कस्टम एन्कोडर में गुजरने का कोई तरीका नहीं है (मेरे पास केवल MyObj लिए एन्कोडर को स्पार्क करने का कोई तरीका नहीं है, ऐसा तब पता है कि Wrap[MyObj] या (Int,MyObj) को कैसे एन्कोड करना है।

बस kryo उपयोग kryo

समाधान सभी का सुझाव है कि kryo एन्कोडर का उपयोग करना है

 import spark.implicits._ class MyObj(val i: Int) implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

यह बहुत थकाऊ तेजी से हालांकि हो जाता है खासकर यदि आपका कोड सभी प्रकार के डेटासेट को जोड़ता है, इसमें शामिल हो रहा है, समूहिंग आदि। आप अतिरिक्त निहितार्थ के एक गुच्छा को खत्म कर देते हैं। तो, क्यों न केवल एक अन्तर्निहित करें जो यह स्वचालित रूप से करता है?

 import scala.reflect.ClassTag implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct) 

और अब, ऐसा लगता है कि मैं लगभग कुछ भी जो मैं चाहता हूँ कर सकता हूँ (नीचे दिए गए उदाहरण spark-shell में काम नहीं करेंगे जहां spark-shell spark.implicits._ स्वतः आयात होता है)

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and .. val d3 = d1.map(d => (di, d)).alias("d3") // .. deals with the new type val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom! 

या लगभग समस्या यह है कि kryo का उपयोग करने के लिए केवल एक फ्लैट बाइनरी ऑब्जेक्ट के रूप में डेटासेट में हर पंक्ति को संग्रहीत करने के लिए स्पार्क की ओर जाता है। map , filter , जो कि पर्याप्त है, लेकिन कार्यप्रणालियों के लिए join तरह, स्पार्क वास्तव में इन्हें कॉलम में अलग करने की आवश्यकता है। d3 या d3 लिए स्कीमा का निरीक्षण करना, आप देखते हैं कि सिर्फ एक बाइनरी कॉलम है:

 d2.printSchema // root // |-- value: binary (nullable = true) 

ट्यूपल्स के लिए आंशिक समाधान

तो, Scala (अधिक 6.26.3 ओवरलोडिंग संकल्प में ) में implicits के जादू का उपयोग करके, मैं अपने आप में एक implications की एक श्रृंखला बना सकता हूँ जो कम से कम ट्यूप्ले के लिए संभव के रूप में एक नौकरी कर सकता है, और वर्तमान implicits के साथ अच्छी तरह से काम करेगा:

 import org.apache.spark.sql.{Encoder,Encoders} import scala.reflect.ClassTag import spark.implicits._ // we can still take advantage of all the old implicits implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2] ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3] ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3) // ... you can keep making these 

फिर, इन अटकलों के साथ सशस्त्र, मैं काम से ऊपर अपना उदाहरण बना सकता हूं, यद्यपि कुछ स्तंभ नामों के साथ

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2") val d3 = d1.map(d => (di ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") 

मैंने अभी तक यह नहीं सोचा है कि उन्हें नाम बदलने के बिना अपेक्षित टपल नाम ( _1 , _2 , …) डिफ़ॉल्ट रूप से कैसे प्राप्त करना है – अगर कोई अन्य इस के साथ खेलना चाहता है, तो वह वह जगह है जहां नाम "value" शुरू किया जाता है और यह वह जगह है जहां टपल नाम आमतौर पर जोड़े जाते हैं। हालांकि, मुख्य बिंदु यह है कि मेरे पास अब एक अच्छी संरचित स्कीमा है:

 d4.printSchema // root // |-- _1: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) // |-- _2: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) 

तो, संक्षेप में, यह समाधान:

  • ट्यूपल्स के लिए अलग-अलग कॉलम प्राप्त करने की अनुमति देता है (इसलिए हम ट्यूप्ले पर फिर से जुड़ सकते हैं, याय!)
  • हम फिर बस implicits पर भरोसा कर सकते हैं (ताकि हर जगह kryo में पारित होने की कोई जरूरत नहीं)
  • लगभग पूरी तरह से import spark.implicits._ के साथ संगत है। import spark.implicits._ (इसमें कुछ नाम शामिल हैं)
  • हमें kyro क्रमबद्ध बाइनरी कॉलम पर शामिल होने नहीं देता है, ये उन फ़ील्ड पर अकेले छोड़ दें जो हो सकता है
  • ट्यूपल स्तंभों में से कुछ को "मान" के नाम से बदलकर अप्रिय साइड-प्रभाव होता है (यदि आवश्यक हो, तो यह परिवर्तित करके पूर्ववत किया जा सकता है। .toDF , नए कॉलम के नामों को निर्दिष्ट करते हुए, और एक डाटासेट पर वापस परिवर्तित किया जाता है – और स्कीमा नाम संरक्षित होने लगते हैं शामिल होने के माध्यम से, जहां वे सबसे ज़रूरी हैं)।

सामान्य में कक्षाओं के लिए आंशिक समाधान

यह एक कम सुखद है और इसका कोई अच्छा समाधान नहीं है हालांकि, अब हमारे पास ऊपर टपल समाधान है, मेरे पास एक और जवाब से एक अन्तर्निर्मित रूपांतरण समाधान है, यह थोड़ा कम दर्दनाक होगा क्योंकि आप अपने अधिक जटिल वर्गों को ट्यूपले में बदल सकते हैं। फिर, डाटासेट बनाने के बाद, आप संभवतः डेटाफ्रेम दृष्टिकोण का उपयोग करके कॉलम का नाम बदल सकते हैं अगर सब ठीक हो जाता है, तो यह वास्तव में एक सुधार है क्योंकि अब मैं अपने वर्गों के क्षेत्र में शामिल हो सकता हूं। अगर मैं सिर्फ एक फ्लैट द्विआधारी kryo इस्तेमाल करता था जो संभव नहीं होता।

यहां एक उदाहरण दिया गया है जो सब कुछ देता है: मेरे पास एक क्लास MyObj जो कि प्रकार के क्षेत्रों, Int , java.util.UUID , और Set[String] । पहले खुद का ख्याल रखता है दूसरा, हालांकि मैं kryo का उपयोग कर सीरियल कर सकता था, अगर String रूप में संग्रहीत किया जाता है (क्योंकि kryo आमतौर पर कुछ है जिसके खिलाफ मैं शामिल होना चाहता हूं) अधिक उपयोगी होगा। तीसरा वास्तव में सिर्फ एक बाइनरी कॉलम में है।

 class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String]) // alias for the type to convert to and from type MyObjEncoded = (Int, String, Set[String]) // implicit conversions implicit def toEncoded(o: MyObj): MyObjEncoded = (oi, outoString, os) implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3) 

अब, मैं इस मशीनरी का उपयोग करके एक अच्छी स्कीमा के साथ एक डाटासेट बना सकता हूं:

 val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")) )).toDF("i","u","s").as[MyObjEncoded] 

और स्कीमा मुझे सही नामों के साथ मैं कॉलम दिखाता है और पहले दो दोनों चीजों के साथ जो मैं शामिल हो सकता हूं

 d.printSchema // root // |-- i: integer (nullable = false) // |-- u: string (nullable = true) // |-- s: binary (nullable = true) 
  1. जेनेरिक एनकोडर्स का उपयोग करना

    अब दो जेनेरिक kryo उपलब्ध हैं जो अब kryo और javaSerialization kryo लिए उपलब्ध हैं, जहां बाद में स्पष्ट रूप से वर्णित किया गया है:

    बेहद अक्षम और केवल अंतिम उपाय के रूप में उपयोग किया जाना चाहिए

    निम्नलिखित वर्ग को मान लें

     class Bar(i: Int) { override def toString = s"bar $i" def bar = i } 

    आप इनकोडर का उपयोग निहित एनकोडर जोड़कर कर सकते हैं:

     object BarEncoders { implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] } 

    जो निम्नानुसार एक साथ इस्तेमाल किया जा सकता है:

     object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarEncoders._ val ds = Seq(new Bar(1)).toDS ds.show sc.stop() } } 

    यह ऑब्जेक्ट्स binary कॉलम के रूप में संग्रहीत करता है, जब DataFrame परिवर्तित किया DataFrame तो आप स्कीमा निम्न करते हैं:

     root |-- value: binary (nullable = true) 

    विशिष्ट क्षेत्र के लिए kryo एन्कोडर का उपयोग करके ट्यूपल्स को सांकेतिक करना भी संभव है:

     val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar]) spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder) // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary] 

    कृपया ध्यान दें कि हम यहाँ पर निहित एनकोडर पर निर्भर नहीं हैं, लेकिन एन्कोडर को स्पष्ट रूप से पास करते हैं, इसलिए यह सबसे अधिक संभावना है कि toDS पद्धति के साथ काम नहीं करेगा।

  2. अंतर्निहित रूपांतरण का उपयोग करना:

    उदाहरण के लिए एन्कोडेड और कस्टम वर्ग के प्रतिनिधित्व के बीच अंतर्निहित रूपांतरण प्रदान करें:

     object BarConversions { implicit def toInt(bar: Bar): Int = bar.bar implicit def toBar(i: Int): Bar = new Bar(i) } object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarConversions._ type EncodedBar = Int val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1))) val barsDS = bars.toDS barsDS.show barsDS.map(_.bar).show sc.stop() } } 

संबंधित सवाल:

  • ऑप्शन प्रकार कन्स्ट्रक्टर के लिए एन्कोडर कैसे बनाया जाए, जैसे कि विकल्प [इंट]?

Spark2.0 में अधिक या कम काम करते हैं। और Kryo अभी भी अनुशंसित serialization विकल्प है।

आप स्पार्क-शेल के साथ निम्नलिखित उदाहरण देख सकते हैं

 scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old. 

अभी तक] वर्तमान गुंजाइश में कोई appropriate encoders नहीं थे इसलिए हमारे लोगों को binary वैल्यू के रूप में एन्कोड नहीं किया गया। लेकिन जब हम Kryo सीरियलाइजेशन का उपयोग करते हुए कुछ implicit एन्कोडर प्रदान करते हैं तो यह बदल जाएगा।

 // Provide Encoders scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson] normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary] scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson] reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary] // Ecoders will be used since they are now present in Scope scala> val ds3 = sc.parallelize(normalPersons).toDS ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary] scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name)) ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary] // now all our persons show up as binary values scala> ds3.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ scala> ds4.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ // Our instances still work as expected scala> ds3.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Spiderman. I am 17 years old. I am Superman. I am 25 years old. scala> ds4.foreach(p => println(p.aboutMe)) I am 25. I am Superman years old. I am 29. I am Ironman years old. I am 17. I am Spiderman years old. 

जावा बीन वर्ग के मामले में, यह उपयोगी हो सकता है

 import spark.sqlContext.implicits._ import org.apache.spark.sql.Encoders implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass]) 

अब आप कस्टम डेटाफ्रेम के रूप में डेटाफ्रेम आसानी से पढ़ सकते हैं

 dataFrame.as[MyClass] 

यह एक कस्टम वर्ग एन्कोडर बनाएगा, न कि बाइनरी एक होगा

मेरे उदाहरण जावा में होंगे, लेकिन मुझे यह कल्पना नहीं है कि स्काला के अनुकूल होना मुश्किल है।

मैं आरडीडी Dataset<Fruit> को डीसेटसेट पर फर्क करने के लिए काफी सफल रहा हूं, स्पार्क। कूटडेटासेट और एन्कोडर्स.बीन का इस्तेमाल करते हैं जब तक Fruit एक सरल जावा बीन है ।

चरण 1: सरल जावा बीन बनाएँ

 public class Fruit implements Serializable { private String name = "default-fruit"; private String color = "default-color"; // AllArgsConstructor public Fruit(String name, String color) { this.name = name; this.color = color; } // NoArgsConstructor public Fruit() { this("default-fruit", "default-color"); } // ...create getters and setters for above fields // you figure it out } 

मैं आदिम प्रकारों के साथ कक्षाओं में रहना चाहूंगा और स्ट्रिंग को फ़ील्ड के रूप में दबाएगा इससे पहले कि डॉटब्रिक्स लोग अपने एन्कोडर्स को बीफ़ करेंगे। यदि आपके नेस्टेड ऑब्जेक्ट के साथ एक क्लास है, तो अपने सभी क्षेत्रों के साथ एक और सरल जावा बीन बनायें, ताकि आप अपने सभी क्षेत्रों को चपटा कर सकें, ताकि आप जटिल प्रकार को सरल एक में मैप करने के लिए आरडीडी ट्रांसमेशन का उपयोग कर सकें। यकीन है कि यह थोड़ा अतिरिक्त काम है, लेकिन मुझे लगता है कि यह एक फ्लैट स्कीमा के साथ काम कर रहे प्रदर्शन पर बहुत मदद करेगा।

चरण 2: आरडीडी से अपना डाटासेट प्राप्त करें

 SparkSession spark = SparkSession.builder().getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(); List<Fruit> fruitList = ImmutableList.of( new Fruit("apple", "red"), new Fruit("orange", "orange"), new Fruit("grape", "purple")); JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList); RDD<Fruit> fruitRDD = fruitJavaRDD.rdd(); Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class); Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean); 

और वोला! नमकीन, कुल्ला, दोहराना