दिलचस्प पोस्ट
अजगर के रूप में एक सूची बनाओ (वर्तमान, अगले) पायथन में एकाधिक एलएचएस के साथ एक रैखिक मॉडल फिटिंग एसक्यूएल में, आप श्रेणियों में "समूह" कैसे कर सकते हैं? कैसे mysql में नाम स्ट्रिंग विभाजित करने के लिए? सभी संदर्भों के साथ अनुक्रम को कैसे लोड करने के लिए विधानसभा को लोड करें? कैसे एक div के अंदर खड़ी एक छवि को संरेखित करें? टंकिनर पाठ विजेट में टेक्स्ट को कैसे उजागर करें एचटीएमएल ईमेल लिखते समय सर्वोत्तम अभ्यास और विचार SQL सर्वर में पंक्ति ऑफसेट विशिष्ट संपर्क प्रोग्राम को टेक्स्ट भेजें (व्हाट्सएप) क्या हैं और वे उपयोगी क्यों हैं? JQuery के बिना माता पिता के पूर्ण दायरे के बच्चे तत्व को मँडरा करते हुए ऑनसाइट पर रोकें इकाई फ़्रेमवर्क और SQL सर्वर देखें कैसे Base64 एन्कोडिंग में node.js करना है? विंडोज फोन 7 (WP7) क्लिक पर एक बटन का पृष्ठभूमि रंग बदलें

स्पार्क डेटाफ़्रेम का उपयोग करते हुए JSON डेटा कॉलम की क्वेरी कैसे करें?

मेरे पास एक सीसांद्रा टेबल है जो सादगी के लिए कुछ ऐसा दिखता है:

key: text jsonData: text blobData: blob 

मैं इसका उपयोग करके चिंगारी और स्पार्क-कासांद्रा-कनेक्टर का उपयोग करने के लिए एक बुनियादी डेटा फ्रेम बना सकता हूं:

 val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load() 

मैं हालांकि अपने अंतर्निहित संरचना में JSON डेटा का विस्तार करने के लिए संघर्ष कर रहा हूँ। मैं अंततः जेएसएसएन स्ट्रिंग के भीतर के गुणों के आधार पर फ़िल्टर करना और ब्लॉब डेटा वापस करना चाहता हूं। कुछ jsonData.foo = "बार" और वापस ब्लॉबडाटा। क्या यह संभव है?

वेब के समाधान से एकत्रित समाधान "स्पार्क डेटाफ़्रेम का उपयोग करते हुए JSON डेटा कॉलम की क्वेरी कैसे करें?"

स्पार्क 2.1+

आप from_json फ़ंक्शन का उपयोग कर सकते हैं:

 import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("k", StringType, true), StructField("v", DoubleType, true) )) df.withColumn("jsonData", from_json($"jsonData", schema)) 

स्पार्क 1.6+

आप get_json_object उपयोग कर सकते हैं, जो एक स्तंभ और पथ लेता है:

 import org.apache.spark.sql.functions.get_json_object val exprs = Seq("k", "v").map( c => get_json_object($"jsonData", s"$$.$c").alias(c)) df.select($"*" +: exprs: _*) 

और फ़ील्ड को अलग-अलग स्ट्रिंग्स के लिए निकालता है, जिसे आगे अपेक्षित प्रकारों में जाँचा जा सकता है।

स्पार्क <= 1.5 :

क्या यह संभव है?

जहाँ तक मुझे पता है कि यह सीधे संभव नहीं है आप इस तरह से कुछ की कोशिश कर सकते हैं:

 val df = sc.parallelize(Seq( ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") )).toDF("key", "jsonData", "blobData") 

मुझे लगता है कि blob फ़ील्ड JSON में प्रदर्शित नहीं किया जा सकता। अन्यथा आप टैक्सी छोड़ते हैं और इसमें शामिल होने से:

 import org.apache.spark.sql.Row val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") val jsons = sqlContext.read.json(df.drop("blobData").map{ case Row(key: String, json: String) => s"""{"key": "$key", "jsonData": $json}""" }) val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") parsed.printSchema // root // |-- jsonData: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: double (nullable = true) // |-- key: long (nullable = true) // |-- blobData: string (nullable = true) 

एक वैकल्पिक (सस्ता, हालांकि अधिक जटिल) दृष्टिकोण एक JSD को पार्स करने के लिए एक यूडीएफ का उपयोग करना है और किसी struct या map कॉलम को आउटपुट करना है। उदाहरण के लिए ऐसा कुछ:

 import net.liftweb.json.parse case class KV(k: String, v: Int) val parseJson = udf((s: String) => { implicit val formats = net.liftweb.json.DefaultFormats parse(s).extract[KV] }) val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) parsed.show // +---+--------------------+------------------+----------+ // |key| jsonData| blobData|parsedJSON| // +---+--------------------+------------------+----------+ // | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| // | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| // +---+--------------------+------------------+----------+ parsed.printSchema // root // |-- key: string (nullable = true) // |-- jsonData: string (nullable = true) // |-- blobData: string (nullable = true) // |-- parsedJSON: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: integer (nullable = false) 

from_json फ़ंक्शन ठीक उसी प्रकार है जिसे आप ढूंढ रहे हैं। आपका कोड कुछ ऐसा दिखेगा:

 val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load() //You can define whatever struct type that your json states val schema = StructType(Seq( StructField("key", StringType, true), StructField("value", DoubleType, true) )) df.withColumn("jsonData", from_json(col("jsonData"), schema)) 

अंतर्निहित JSON स्ट्रिंग है

 "{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}"; 

नीचे JSON को फ़िल्टर करने और Cassandra में आवश्यक डेटा लोड करने की स्क्रिप्ट है

  sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2") .write.format("org.apache.spark.sql.cassandra") .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name")) .mode(SaveMode.Append) .save()