दिलचस्प पोस्ट
क्या यह PHP में ओवरलोड ऑपरेटरों के लिए संभव है? ज़क्सिंग का उपयोग करते हुए QR कोड एन्कोडिंग और डिकोडिंग जावा में उद्धरण के बीच डेटा कैसे प्राप्त करें? सभी तालिकाओं की खोज करें, किसी विशिष्ट मान के लिए सभी कॉलम SQL सर्वर "Java.security.cert.CertificateException: कोई विषय वैकल्पिक नाम मौजूद नहीं" त्रुटि कैसे ठीक करें? पायथन के उपयोग सेलेनियम वेबड्राइवर में वेब एलेमेंट का HTML स्रोत प्राप्त करें jQuery टेम्पलेटिंग इंजन Php / mysqli में संग्रहित प्रक्रिया के साथ एकाधिक परिणाम सेट पुनर्प्राप्त करना क्या मुझे घोषणा या कंस्ट्रक्टर पर इंस्टेंस चर को इन्स्तांत करना चाहिए? ब्रैकेट (“) और डॉट (`.`) नोटेशन के बीच अंतर मैं रनटाइम पर एक क्लास पर एक विशेषता कैसे पढ़ूं? उपयोगकर्ता जनरेट किए गए HTML के अंदर जावास्क्रिप्ट इंजेक्शन हमलों को कैसे रोकें हॉटमेल, जीमेल और याहू के लिए PHP एपीआई? मानक 10 अंकीय फोन नंबर से मेल खाने के लिए नियमित अभिव्यक्ति PHP में समय अपलोड अपलोड कर सकता है?

स्पार्क में डेटाफ्रेम के लिए आरडीड ऑब्जेक्ट कैसे परिवर्तित करें

मैं एक आरडीडी ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] org.apache.spark.sql.Row org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] ) को एक डाटाफ्रेम org.apache.spark.sql.DataFrame में कैसे रूपांतरित कर सकता / सकती हूं। मैं .rdd का उपयोग करने के लिए एक .rdd । इसे संसाधित करने के बाद मैं इसे डाटाफ्रेम में वापस करना चाहता हूं मैं यह कैसे कर सकता हूँ ?

वेब के समाधान से एकत्रित समाधान "स्पार्क में डेटाफ्रेम के लिए आरडीड ऑब्जेक्ट कैसे परिवर्तित करें"

SqlContext में कई createDataFrame विधियां हैं जो एक RDD दिए गए DataFrame बनाते हैं। मैं सोचता हूं कि इनमें से एक आपके संदर्भ के लिए काम करेगा।

उदाहरण के लिए:

 def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame 

दिए गए स्कीमा का उपयोग करते हुए एक आरडीडी से डेटाफ़्रेम बनाते हैं।

अपने आरडीडी [पंक्ति] को मानते हुए आपको आरडीडी कहा जाता है, आप इसका उपयोग कर सकते हैं:

 val sqlContext = new SQLContext(sc) import sqlContext.implicits._ rdd.toDF() 

यह कोड स्काला 2.11 से स्पार्क 2.x से पूरी तरह से काम करता है

आवश्यक वर्गों को आयात करें

 import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} 

SparkSession ऑब्जेक्ट बनाएं, यहां spark

 val spark: SparkSession = SparkSession.builder.master("local").getOrCreate val sc = spark.sparkContext // Just used to create test RDDs 

चलो इसे DataFrame बनाने के लिए DataFrame

 val rdd = sc.parallelize( Seq( ("first", Array(2.0, 1.0, 2.1, 5.4)), ("test", Array(1.5, 0.5, 0.9, 3.7)), ("choose", Array(8.0, 2.9, 9.1, 2.5)) ) ) 

विधि 1

SparkSession.createDataFrame(RDD obj) का उपयोग करना

 val dfWithoutSchema = spark.createDataFrame(rdd) dfWithoutSchema.show() +------+--------------------+ | _1| _2| +------+--------------------+ | first|[2.0, 1.0, 2.1, 5.4]| | test|[1.5, 0.5, 0.9, 3.7]| |choose|[8.0, 2.9, 9.1, 2.5]| +------+--------------------+ 

विधि 2

SparkSession.createDataFrame(RDD obj) का उपयोग करना और स्तंभ नामों को निर्दिष्ट करना।

 val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals") dfWithSchema.show() +------+--------------------+ | id| vals| +------+--------------------+ | first|[2.0, 1.0, 2.1, 5.4]| | test|[1.5, 0.5, 0.9, 3.7]| |choose|[8.0, 2.9, 9.1, 2.5]| +------+--------------------+ 

विधि 3 (प्रश्न का वास्तविक उत्तर)

इस तरह इनपुट rdd RDD[Row] का होना चाहिए।

 val rowsRdd: RDD[Row] = sc.parallelize( Seq( Row("first", 2.0, 7.0), Row("second", 3.5, 2.5), Row("third", 7.0, 5.9) ) ) 

स्कीमा बनाएं

 val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val1", DoubleType, true)) .add(StructField("val2", DoubleType, true)) 

अब दोनों rowsRdd लागू rowsRdd और createDataFrame() बनाने के createDataFrame()

 val df = spark.createDataFrame(rowsRdd, schema) df.show() +------+----+----+ | id|val1|val2| +------+----+----+ | first| 2.0| 7.0| |second| 3.5| 2.5| | third| 7.0| 5.9| +------+----+----+ 

मान लें कि आपके पास DataFrame और आप DataFrame RDD[Row] परिवर्तित करके फ़ील्ड डेटा पर कुछ संशोधन करना चाहते हैं।

 val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head)) 

DataFrame से वापस DataFrame को कनवर्ट करने के लिए DataFrame के संरचना प्रकार को परिभाषित करने की आवश्यकता है

यदि डेटाटाइप Long था तो यह संरचना में LongType रूप में हो जाएगा।

यदि String StringType संरचना में है

 val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true))) 

अब आप createDataFrame विधि का उपयोग कर RDD को DataFrame में कनवर्ट कर सकते हैं।

 val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct) 

नोट: यह उत्तर मूल रूप से यहां पोस्ट किया गया था

मैं इस उत्तर को पोस्ट कर रहा हूं क्योंकि मैं उपलब्ध विकल्पों के बारे में अतिरिक्त विवरण साझा करना चाहूंगा, जो मुझे दूसरे उत्तरों में नहीं मिले


पंक्तियों के आरडीडी से एक डाटाफ्रेम बनाने के लिए, दो मुख्य विकल्प होते हैं:

1) जैसा कि पहले से ही बताया गया है, आप toDF() को इस्तेमाल कर सकते हैं जो import sqlContext.implicits._import sqlContext.implicits._ द्वारा import sqlContext.implicits._ किया जा सकता है। हालांकि, यह दृष्टिकोण केवल निम्नलिखित प्रकार के RDD के लिए काम करता है:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(स्रोत: SQL कोंटेन्टेक्स । SQLContext.implicits ऑब्जेक्ट के SQLContext.implicits )

अंतिम हस्ताक्षर वास्तव में इसका अर्थ है कि यह ट्यूपल्स के scala.Product या केस क्लासेस का scala.Product (क्योंकि ट्यूपल्स और केस क्लास scala.Product के scala.Product उप-वर्ग हैं) के लिए काम कर सकते हैं।

इसलिए, एक RDD[Row] लिए इस दृष्टिकोण का उपयोग करने के लिए, आपको इसे RDD[T <: scala.Product] । यह प्रत्येक पंक्ति को एक कस्टम केस वर्ग या एक ट्यूपल पर मैप करने से, निम्न कोड स्निपेट के अनुसार किया जा सकता है:

 val df = rdd.map({ case Row(val1: String, ..., valN: Long) => (val1, ..., valN) }).toDF("col1_name", ..., "colN_name") 

या

 case class MyClass(val1: String, ..., valN: Long = 0L) val df = rdd.map({ case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN) }).toDF("col1_name", ..., "colN_name") 

इस दृष्टिकोण का मुख्य दोष (मेरी राय में) यह है कि आपको स्पष्ट रूप से परिणामी डेटाफ्रेम के स्कीमा को मानचित्र फ़ंक्शन में, स्तंभ द्वारा कॉलम सेट करना होगा। हो सकता है कि यह प्रोग्रामिक रूप से किया जा सकता है अगर आपको स्कीमा अग्रिम में नहीं पता है, लेकिन चीजें थोड़ी गड़बड़ कर सकती हैं। इसलिए, वैकल्पिक रूप से, एक और विकल्प है:


2) आप createDataFrame(rowRDD: RDD[Row], schema: StructType) का उपयोग स्वीकार किए जाते हैं, जो कि SQLContext ऑब्जेक्ट में उपलब्ध है। पुराने डेटाफ्रेम के RDD को बदलने के लिए उदाहरण:

 val rdd = oldDF.rdd val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema) 

ध्यान दें कि स्कीमा स्तंभ को स्पष्ट रूप से सेट करने की कोई आवश्यकता नहीं है। हम पुराने डीएफ स्कीमा का पुन: उपयोग करते हैं, जो कि StructType क्लास का है और आसानी से बढ़ाया जा सकता है। हालांकि, यह दृष्टिकोण कभी-कभी संभव नहीं है, और कुछ मामलों में पहले एक से कम कुशल हो सकता है।

यहां स्पार्क RDD में अपनी सूची को परिवर्तित करने और फिर स्पार्क RDD को डेटाफ्रेम में परिवर्तित करने का एक सरल उदाहरण है।

कृपया ध्यान दें कि मैंने स्पार्क-शेल का उपयोग REPL को निम्नलिखित कोड को निष्पादित करने के लिए किया है, यहां स्परकॉन्टेक्स का एक उदाहरण है जो स्पर्क-शेल में निहित रूप से उपलब्ध है। आशा है कि यह आपके प्रश्न का उत्तर देगा

 scala> val numList = List(1,2,3,4,5) numList: List[Int] = List(1, 2, 3, 4, 5) scala> val numRDD = sc.parallelize(numList) numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28 scala> val numDF = numRDD.toDF numDF: org.apache.spark.sql.DataFrame = [_1: int] scala> numDF.show +---+ | _1| +---+ | 1| | 2| | 3| | 4| | 5| +---+ 

विधि 1: (स्कला)

 val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z") 

विधि 2: (स्कला)

 case class temp(val1: String,val3 : Double) val rdd = sc.parallelize(Seq( Row("foo", 0.5), Row("bar", 0.0) )) val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF() rows.show() 

विधि 1: (पायथन)

 from pyspark.sql import Row l = [('Alice',2)] Person = Row('name','age') rdd = sc.parallelize(l) person = rdd.map(lambda r:Person(*r)) df2 = sqlContext.createDataFrame(person) df2.show() 

विधि 2: (पायथन)

 from pyspark.sql.types import * l = [('Alice',2)] rdd = sc.parallelize(l) schema = StructType([StructField ("name" , StringType(), True) , StructField("age" , IntegerType(), True)]) df3 = sqlContext.createDataFrame(rdd, schema) df3.show() 

पंक्ति ऑब्जेक्ट से मान निकाला और फिर डीएफ़ को आरडीपी में कनवर्ट करने के लिए केस क्लास लागू किया

 val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" } val temp2 = attrib2.map{case Row ( key: Int) => s"$key" } case class RLT (id: String, attrib_1 : String, attrib_2 : String) import hiveContext.implicits._ val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF 

डेटाफ्रेम बनाने के चार तरीके हैं

  1. डेटाफ्रेम एपीआई
  2. प्रोग्राममिक रूप से निर्दिष्ट स्कीमा
  3. केस वर्ग
  4. toDF () विधि अधिक जानकारी: https://www.youtube.com/watch?v=JtJeEEzEu1NU

डीडीएफ () पर लागू होने से पहले, डीडीएफ () विधि में उपयोग करने के लिए सरल तरीके से, आपको केवल डीडीएफ () विधि पर लागू होने से संरचित प्रारूप बनाना होगा। मुझे लगता है कि यह वीडियो आपकी सहायता करेगा https://www.youtube.com/watch?v=nsbjzpbCJV4

 One needs to create a schema, and attach it to the Rdd. 

Val स्पार्क मानते हुए स्पार्कसेशन। बिल्डर का एक उत्पाद है …

  import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ /* Lets gin up some sample data: * As RDD's and dataframes can have columns of differing types, lets make our * sample data a three wide, two tall, rectangle of mixed types. * A column of Strings, a column of Longs, and a column of Doubules */ val arrayOfArrayOfAnys = Array.ofDim[Any](2,3) arrayOfArrayOfAnys(0)(0)="aString" arrayOfArrayOfAnys(0)(1)=0L arrayOfArrayOfAnys(0)(2)=3.14159 arrayOfArrayOfAnys(1)(0)="bString" arrayOfArrayOfAnys(1)(1)=9876543210L arrayOfArrayOfAnys(1)(2)=2.71828 /* The way to convert an anything which looks rectangular, * (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to * throw it into sparkContext.parallelize. * http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows * the parallelize definition as * def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism) * so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys. * Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it. */ val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys) /* We'll be using the sqlContext.createDataFrame to add a schema our RDD. * The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have. * To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq) * As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type. */ val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=> Row.fromSeq(f.toSeq) ) /* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe. * https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as * case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty) * Will leave the two default values in place for each of the columns: * nullability as true, * metadata as an empty Map[String,Any] * */ val schema = StructType( StructField("colOfStrings", StringType) :: StructField("colOfLongs" , LongType ) :: StructField("colOfDoubles", DoubleType) :: Nil ) val df=spark.sqlContext.createDataFrame(rddOfRows,schema) /* * +------------+----------+------------+ * |colOfStrings|colOfLongs|colOfDoubles| * +------------+----------+------------+ * | aString| 0| 3.14159| * | bString|9876543210| 2.71828| * +------------+----------+------------+ */ df.show 

वही कदम, लेकिन कम वाल घोषणाओं के साथ:

  val arrayOfArrayOfAnys=Array( Array("aString",0L ,3.14159), Array("bString",9876543210L,2.71828) ) val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq)) /* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata: * Consider constructing the schema from an Array[StructField]. This would allow looping over * the columns, with a match statement applying the appropriate sql datatypes as the second * StructField arguments. */ val sf=new Array[StructField](3) sf(0)=StructField("colOfStrings",StringType) sf(1)=StructField("colOfLongs" ,LongType ) sf(2)=StructField("colOfDoubles",DoubleType) val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList)) df.show 

स्पार्क के नए संस्करण (2.0+) पर यह एक उपलब्ध sqlcontext के बिना भी काम करेगा।

 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ val spark = SparkSession .builder() .getOrCreate() import spark.implicits._ val dfSchema = Seq("col1", "col2", "col3") rdd.toDF(dfSchema: _*)