दिलचस्प पोस्ट
LINQ को इकाईयों को 'डबल पार्स (सिस्टम। स्ट्रिंग)' विधि की पहचान नहीं है, और इस पद्धति को किसी स्टोर अभिव्यक्ति में अनुवाद नहीं किया जा सकता है "एडीबी शेल डीडसिस अलार्म" आउटपुट कैसे पढ़ा जाए शून्य / नल को आरक्षित करें / रीसेट करें Prolog – सूची में पुनरावृत्तियों गिनती क्या PHP5 ऑब्जेक्ट संदर्भ से पास हैं? एचटीएमएल चपलता पैक – पार्सिंग टेबल एक समान वितरण को एक सामान्य वितरण में परिवर्तित करना Windows पर स्पार्क कैसे सेट अप करें? एक इकाई संबंध मॉडल और एक संबंधपरक मॉडल के बीच अंतर क्या है? शब्दकोश में एक कुंजी का नाम बदलें पृष्ठभूमि कार्य से iOS 7 में स्थान प्रबंधक प्रारंभ करें हर दिन दोपहर एक सेवा को कैसे चलाने के लिए, और हर बूट पर ऑरेकल में सभी टेबल बाधाओं को अक्षम करें डबल-क्लिक इवेंट का पता लगाया जाने पर क्लिक / माउसअप ईवेंट को रद्द करने की आवश्यकता है MySQL के लिए स्ट्रिंग पायथन को बचें

मैं सीएसवी फ़ाइल को आरडीडी में कैसे रूपांतरित कर सकता हूं

मैं चिंगारी के लिए नया हूँ मैं सीएसवी रिकॉर्ड में विशेष डेटा पर कुछ परिचालन करना चाहता हूं।

मैं एक सीएसवी फ़ाइल पढ़ने और इसे RDD में परिवर्तित करने की कोशिश कर रहा हूँ। मेरे आगे के संचालन सीएसवी फ़ाइल में दिए गए शीर्षक पर आधारित हैं।

(टिप्पणियों से) यह मेरा कोड अभी तक है:

final JavaRDD<String> File = sc.textFile(Filename).cache(); final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(EOL.split(s)); } }); final String heading=lines.first().toString(); 

मैं इस तरह के शीर्ष लेख मान प्राप्त कर सकता हूँ मैं इसे प्रत्येक रिकॉर्ड में CSV फ़ाइल में मैप करना चाहता हूं।

 final String[] header=heading.split(" "); 

मैं इस तरह के शीर्ष लेख मान प्राप्त कर सकता हूँ मैं इसे प्रत्येक रिकॉर्ड में CSV फ़ाइल में मैप करना चाहता हूं।

जावा में मैं CSVReader record.getColumnValue(Column header) का उपयोग कर रहा हूं ताकि विशिष्ट मान मिल सके। मुझे इसके लिए कुछ इसी तरह की आवश्यकता है।

वेब के समाधान से एकत्रित समाधान "मैं सीएसवी फ़ाइल को आरडीडी में कैसे रूपांतरित कर सकता हूं"

एक सरलीकृत दृष्टिकोण के लिए हेडर को संरक्षित करने का एक तरीका होगा।

मान लें कि आपके पास एक फ़ाइल है सीसीवी:

 user, topic, hits om, scala, 120 daniel, spark, 80 3754978, spark, 1 

हम एक हेडर वर्ग को परिभाषित कर सकते हैं जो पहली पंक्ति का पार्स किए गए संस्करण का उपयोग करता है:

 class SimpleCSVHeader(header:Array[String]) extends Serializable { val index = header.zipWithIndex.toMap def apply(array:Array[String], key:String):String = array(index(key)) } 

हम सड़क के नीचे डेटा को संबोधित करने के लिए उस हेडर का उपयोग कर सकते हैं:

 val csv = sc.textFile("file.csv") // original file val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line val rows = data.filter(line => header(line,"user") != "user") // filter the header out val users = rows.map(row => header(row,"user") val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt) ... 

ध्यान दें कि header सरणी अनुक्रमणिका के लिए एक मेमनिक के साधारण मानचित्र से अधिक नहीं है। बहुत ज्यादा यह सब सरणी में तत्व के क्रमिक स्थान पर किया जा सकता है, जैसे user = row(0)

PS: स्काला में आपका स्वागत है 🙂

आप स्पार्क-सीएसवी लाइब्रेरी का उपयोग कर सकते हैं: https://github.com/databricks/spark-csv

यह सीधे दस्तावेज से है:

 import org.apache.spark.sql.SQLContext SQLContext sqlContext = new SQLContext(sc); HashMap<String, String> options = new HashMap<String, String>(); options.put("header", "true"); options.put("path", "cars.csv"); DataFrame df = sqlContext.load("com.databricks.spark.csv", options); 

सबसे पहले मुझे यह कहना चाहिए कि यदि आप अलग-अलग फाइलों में अपने हेडर डालते हैं तो यह बहुत आसान है – यह बड़े डेटा में सम्मेलन है

वैसे भी डैनियल का जवाब बहुत अच्छा है, लेकिन इसकी एक अक्षमता और बग है, इसलिए मैं अपना खुद का पोस्ट करने जा रहा हूं। अक्षमता यह है कि आपको यह देखने के लिए प्रत्येक रिकॉर्ड की जांच करने की आवश्यकता नहीं है कि क्या यह शीर्ष लेख है, आपको प्रत्येक विभाजन के लिए पहला रिकॉर्ड देखना होगा। बग यह है कि .split(",") का उपयोग कर आप अपवाद को फेंक सकते हैं या गलत कॉलम प्राप्त कर सकते हैं जब प्रविष्टियां रिक्त स्ट्रिंग हैं और रिकॉर्ड की शुरुआत या समाप्ति पर होती हैं – यह सही करने के लिए आपको उपयोग करने की आवश्यकता है .split(",", -1) तो यहाँ पूर्ण कोड है:

 val header = scala.io.Source.fromInputStream( hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration) .open(new hadoop.fs.Path(path))) .getLines.head val columnIndex = header.split(",").indexOf(columnName) sc.textFile(path).mapPartitions(iterator => { val head = iterator.next() if (head == header) iterator else Iterator(head) ++ iterator }) .map(_.split(",", -1)(columnIndex)) 

अंतिम अंक, लकड़ी की छत पर विचार करें यदि आप केवल कुछ कॉलमों को फ़िश करना चाहते हैं। या कम से कम एक लाज़िली मूल्यांकन किए गए विभाजन समारोह को लागू करने पर विचार करें यदि आपके पास विस्तृत पंक्तियाँ हैं

हम सीएसवी डेटा पढ़ने और लिखने के लिए नए डेटाफ़्रेम आरडीडी का उपयोग कर सकते हैं। NormRRDD पर डेटाफ़्रेम आरडीडी के कुछ फायदे हैं:

  1. डेटाफ्रेम आरडीडी सामान्य से अधिक तेजी से सामान्य है क्योंकि हम स्कीमा निर्धारित करते हैं और जो रनटाइम पर बहुत कुछ अनुकूलित करने में हमें मदद करते हैं और हमें महत्वपूर्ण प्रदर्शन लाभ प्रदान करते हैं।
  2. यहां तक ​​कि अगर सीएसवी में कॉलम बदल जाता है तो यह स्वचालित रूप से सही कॉलम लेगा क्योंकि हम कॉलम नंबर को कठोर नहीं कर रहे हैं जो डेटा को पाठ के रूप में पढ़ा हुआ है और फिर इसे विभाजित करते हैं और फिर डेटा प्राप्त करने के लिए कॉलम की संख्या का उपयोग करते हुए।
  3. कोड की कुछ पंक्तियों में आप सीधे सीएसवी फ़ाइल पढ़ सकते हैं

आपको इस पुस्तकालय की आवश्यकता होगी: इसे build.sbt में जोड़ें

 libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0" 

इसके लिए स्पार्क स्काला कोड:

 val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val csvInPath = "/path/to/csv/abc.csv" val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath) //format is for specifying the type of file you are reading //header = true indicates that the first line is header in it 

इसे से कुछ कॉलम लेकर सामान्य आरडीडी में बदलने के लिए और

 val rddData = df.map(x=>Row(x.getAs("colA"))) //Do other RDD operation on it 

आरडीडी को सीएसवी प्रारूप में सहेजना:

 val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true)))) aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp") 

चूंकि हेडर सही पर सेट है इसलिए हम सभी आउटपुट फाइलों में हैडर नाम प्राप्त कर रहे होंगे।

सीडीएवी को आरडीडी में कनवर्ट करने के लिए स्पार्क / स्काला का उपयोग करने वाला यह दूसरा उदाहरण है अधिक विस्तृत विवरण के लिए यह पोस्ट देखें।

 def main(args: Array[String]): Unit = { val csv = sc.textFile("/path/to/your/file.csv") // split / clean data val headerAndRows = csv.map(line => line.split(",").map(_.trim)) // get header val header = headerAndRows.first // filter out header (eh. just check if the first val matches the first header name) val data = headerAndRows.filter(_(0) != header(0)) // splits to map (header/value pairs) val maps = data.map(splits => header.zip(splits).toMap) // filter out the user "me" val result = maps.filter(map => map("user") != "me") // print result result.foreach(println) } 

मैं सीधे शीर्षक से हेडर को पढ़ने की सिफारिश करता हूं, स्पार्क के माध्यम से नहीं। इसके लिए दो कारण हैं: 1) यह एक पंक्ति है वितरित दृष्टिकोण के लिए कोई फायदा नहीं है 2) हमें चालक में इस रेखा की जरूरत है, कार्यकर्ता नोड्स नहीं।

यह कुछ इस तरह चलता है:

 // Ridiculous amount of code to read one line. val uri = new java.net.URI(filename) val conf = sc.hadoopConfiguration val fs = hadoop.fs.FileSystem.get(uri, conf) val path = new hadoop.fs.Path(filename) val stream = fs.open(path) val source = scala.io.Source.fromInputStream(stream) val header = source.getLines.head 

अब जब आप आरडीडी बनाते हैं तो आप शीर्ष लेख को छोड़ सकते हैं।

 val csvRDD = sc.textFile(filename).filter(_ != header) 

इसके बाद हम एक कॉलम से आरडीडी बना सकते हैं, उदाहरण के लिए:

 val idx = header.split(",").indexOf(columnName) val columnRDD = csvRDD.map(_.split(",")(idx)) 

एक अन्य विकल्प mapPartitionsWithIndex का उपयोग करना mapPartitionsWithIndex विधि के रूप में आप विभाजन सूचकांक संख्या और उस विभाजन के भीतर सभी पंक्तियों की एक सूची प्राप्त करेंगे। विभाजन 0 और पंक्ति 0 शीर्ष लेख होगा

 val rows = sc.textFile(path) .mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) => val results = new ArrayBuffer[(String, Int)] var first = true while (rows.hasNext) { // check for first line if (index == 0 && first) { first = false rows.next // skip the first row } else { results += rows.next } } results.toIterator }, true) rows.flatMap { row => row.split(",") } 

इस बारे में कैसा है?

 val Delimeter = "," val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter)) 

मैं आपको सुझाव देना चाहता हूं

https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds

 JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); 

आपको इस उदाहरण में एक व्यक्ति को अपने फ़ाइल शीर्षलेख की कल्पना के साथ एक क्लास होना चाहिए और अपने डेटा को स्कीमा में जोड़ना होगा और इच्छित मानदंड प्राप्त करने के लिए मानदंड को लागू करना होगा।

मुझे लगता है कि आप उस सीएसवी को आरडीडी में लोड करने की कोशिश कर सकते हैं और फिर उस आरडीडी से एक डाटाफ्रेम बना सकते हैं, यहां डेटाफीम को आरडीएस से बनाने का दस्तावेज है: http://spark.apache.org/docs/latest/sql-programming-guide .html # अंतरसंक्रियता-साथ-rdds

स्पार्क स्कला के लिए मैं आमतौर पर उपयोग करता हूँ जब मैं स्पार्क सीएसवी पैकेज का उपयोग नहीं कर सकता …

 val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv") val header = rawdata.first() val tbldata = rawdata.filter(_(0) != header(0)) 

स्पार्क 2.0 के अनुसार, सीएसवी को सीधे DataFrame में पढ़ा जा सकता है।

यदि डेटा फ़ाइल में एक शीर्ष पंक्ति नहीं है, तो यह होगा:

 val df = spark.read.csv("file://path/to/data.csv") 

वह डेटा लोड करेगा, लेकिन प्रत्येक कॉलम जेनेरिक नामों को जैसे _c0 , _c1 , आदि दें।

अगर शीर्षलेख जोड़ रहे हैं तो .option("header", "true") पहली पंक्ति का उपयोग DataFrame में कॉलम को परिभाषित करने के लिए DataFrame :

 val df = spark.read .option("header", "true") .csv("file://path/to/data.csv") 

एक ठोस उदाहरण के लिए, मान लें कि आपकी सामग्री के साथ फाइल है:

 user,topic,hits om,scala,120 daniel,spark,80 3754978,spark,1 

फिर निम्नलिखित विषय के आधार पर कुल हिट मिलेगा:

 import org.apache.spark.sql.functions._ import spark.implicits._ val rawData = spark.read .option("header", "true") .csv("file://path/to/data.csv") // specifies the query, but does not execute it val grouped = rawData.groupBy($"topic").agg(sum($"hits)) // runs the query, pulling the data to the master node // can fail if the amount of data is too much to fit // into the master node's memory! val collected = grouped.collect // runs the query, writing the result back out // in this case, changing format to Parquet since that can // be nicer to work with in Spark grouped.write.parquet("hdfs://some/output/directory/") // runs the query, writing the result back out // in this case, in CSV format with a header and // coalesced to a single file. This is easier for human // consumption but usually much slower. grouped.coalesce(1) .write .option("header", "true") .csv("hdfs://some/output/directory/")