दिलचस्प पोस्ट
ऑनक्लिक = "" बनाम ईवेंट हैंडलर "पॉइंटर टू पॉइन्टर टू गैर-कॉन्स्ट" को "पॉइंटर टू पॉइन्टर टू कॉन्स्ट" करने के लिए क्यों यह कानूनी नहीं है? कैसे jQuery के साथ एक रेडियो बटन को चेक करें? मैं Django ModelForm में विदेशी कुंजी विकल्प कैसे फ़िल्टर करूं? सफ़ारी 3 पार्टी कुकी आइफ्रेम चाल अब काम नहीं कर रहा है? mmap () बनाम पढ़ने के ब्लॉक तिथि के अनुसार अर्रे सूची में ऑब्जेक्ट सॉर्ट करें? मैं सी # में गतिशील गुण कैसे बना सकता हूं? जीटोसिस बनाम जीइटोलाइट? क्रॉकफोर्ड का प्रोटोटाइप उत्तराधिकार – नेस्टेड ऑब्जेक्ट्स के साथ समस्याएं स्विंग में धीरे-धीरे एक छवि को कैसे घुमाएंगे? क्या मैं WKWebView द्वारा उपयोग की जाने वाली कुकी सेट कर सकता हूं? नकारात्मक पूर्णांक विभाजन आश्चर्यजनक परिणाम बाल नोड्स पाने का सर्वोत्तम तरीका एकाधिक चयन बॉक्स के सभी चयनित मान कैसे प्राप्त करें?

स्पार्क-सीएसवी का उपयोग करके एकल सीएसवी फाइल लिखें

मैं https://github.com/databricks/spark-csv का उपयोग कर रहा हूँ, मैं एक सीएसवी लिखने की कोशिश कर रहा हूं, लेकिन सक्षम नहीं है, यह एक फ़ोल्डर बना रहा है

एक स्काला फ़ंक्शन की आवश्यकता है जो पथ और फ़ाइल नाम की तरह पैरामीटर लेगा और उस सीएसवी फ़ाइल को लिख सकता है।

वेब के समाधान से एकत्रित समाधान "स्पार्क-सीएसवी का उपयोग करके एकल सीएसवी फाइल लिखें"

यह एकाधिक फ़ाइलों के साथ एक फ़ोल्डर बना रहा है, क्योंकि प्रत्येक विभाजन व्यक्तिगत रूप से सहेजा जाता है यदि आपको एक एकल आउटपुट फाइल की आवश्यकता है (फिर भी फ़ोल्डर में) तो आप सहेजने से पहले डेटा फ्रेम को repartition : repartition कर सकते हैं:

 df // place all data in a single partition .coalesce(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv") 

सभी डेटा को mydata.csv/part-00000 लिखा जाएगा। इस विकल्प का उपयोग करने से पहले सुनिश्चित करें कि आप समझ रहे हैं कि क्या चल रहा है और एक एकल कार्यकर्ता को सभी डेटा स्थानांतरित करने की लागत क्या है अगर आप प्रतिकृति के साथ वितरित फाइल सिस्टम का उपयोग करते हैं, तो डेटा को कई बार स्थानांतरित किया जाएगा – पहली बार एक एकल कार्यकर्ता को प्राप्त किया जाता है और बाद में संग्रहण नोड्स पर वितरित किया जाता है।

वैकल्पिक रूप से आप अपना कोड छोड़ सकते हैं क्योंकि यह सामान्य उद्देश्य उपकरण जैसे cat या getmerge को बाद में सभी भागों में मर्ज करने के लिए getmerge

मैं यहां खेल को थोड़ा देर से कर सकता हूं, लेकिन coalesce(1) या repartition(1) उपयोग करना छोटे डेटा सेट के लिए काम कर सकता है, लेकिन बड़े डेटा-सेट सभी एक नोड पर एक नोड पर फेंक दिए जाएंगे। यह ओओएम त्रुटियों को फेंकने की संभावना है, या सबसे अच्छा, धीरे धीरे प्रक्रिया

मैं अत्यधिक सुझाव देता हूं कि आप Hadoop API से FileUtil.copyMerge() फ़ंक्शन का उपयोग करते हैं। यह एक एकल फाइल में आउटपुट को मर्ज करेगा।

संपादित करें – यह कारगर ढंग से एक निष्पादक नोड के बजाय डेटा में चालक को लाता है। Coalesce() ठीक हो जाएगा यदि एक निष्पादक के पास चालक की तुलना में उपयोग के लिए अधिक रैम है।

2 संपादित करें: कॉपी मेर्ज () को Hadoop 3.0 में हटाया जा रहा है। नवीनतम संस्करण के साथ काम करने के तरीके के बारे में अधिक जानकारी के लिए निम्नलिखित स्टैक अतिप्रवाह लेख देखें: Hadoop 3.0 में Hadoop कैसे करें

अगर आप एचडीएफएस के साथ स्पार्क चला रहे हैं, तो मैं सामान्य रूप से सीएसवी फाइलों को लिखकर और मर्ज करने के लिए एचडीएफएफ का इस्तेमाल करके समस्या को हल कर रहा हूं। मैं स्पार्क (1.6) में सीधे कर रहा हूं:

 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ def merge(srcPath: String, dstPath: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) // the "true" setting deletes the source files once they are merged into the new output } val newData = << create your dataframe >> val outputfile = "/user/feeds/project/outputs/subject" var filename = "myinsights" var outputFileName = outputfile + "/temp_" + filename var mergedFileName = outputfile + "/merged_" + filename var mergeFindGlob = outputFileName newData.write .format("com.databricks.spark.csv") .option("header", "false") .mode("overwrite") .save(outputFileName) merge(mergeFindGlob, mergedFileName ) newData.unpersist() 

याद नहीं कर सकते कि मुझे यह चाल कहाँ से मिली है, लेकिन यह आपके लिए काम कर सकता है। एंड्रयू, bytesumo.com

यदि आप डाटाट्रिक्स का उपयोग कर रहे हैं और एक कर्मचारी पर सभी डेटा रैम में फिट कर सकते हैं (और इस तरह से .coalesce (1)) का उपयोग कर सकते हैं, तो आप परिणामस्वरूप सीएसवी फ़ाइल को खोजने और स्थानांतरित करने के लिए dbfs का उपयोग कर सकते हैं:

 val fileprefix= "/mnt/aws/path/file-prefix" dataset .coalesce(1) .write //.mode("overwrite") // I usually don't use this, but you may want to. .option("header", "true") .option("delimiter","\t") .csv(fileprefix+".tmp") val partition_path = dbutils.fs.ls(fileprefix+".tmp/") .filter(file=>file.name.endsWith(".csv"))(0).path dbutils.fs.cp(partition_path,fileprefix+".tab") dbutils.fs.rm(fileprefix+".tmp",recurse=true) 

यदि आपकी फ़ाइल कार्यकर्ता पर रैम में फिट नहीं है, तो आप FileUtils.copyMerge () का उपयोग करने के लिए अराजक 3quilibrium के सुझाव पर विचार कर सकते हैं। मैंने ऐसा नहीं किया है, और अभी तक नहीं पता है कि क्या संभव है या नहीं, उदाहरण के लिए, एस 3 पर।

यह उत्तर इस सवाल के पिछले उत्तरों पर और साथ ही प्रदान किए गए कोड स्निपेट के अपने परीक्षणों पर बनाया गया है। मैं इसे मूल रूप से डाटाट्रिक्स पर पोस्ट कर रहा हूं और यहां इसे पुनर्प्रकाशित कर रहा हूं ।

मुझे लगता है कि dbfs के आरएम के पुनरावर्ती विकल्प के लिए सबसे अच्छा दस्तावेज एक डाटाट्रिक्स फ़ोरम पर है ।

आपके द्वारा सहेजने से पहले 1 विभाजन को पुनः विभाजन / संगठित करना (आप अभी भी एक फ़ोल्डर प्राप्त करते हैं, लेकिन इसमें इसमें एक भाग फ़ाइल होगी)

आप rdd.coalesce(1, true).saveAsTextFile(path) उपयोग कर सकते हैं। rdd.coalesce(1, true).saveAsTextFile(path)

यह डेटा पथ / भाग -00000 में गायिका फ़ाइल के रूप में संग्रहीत करेगा

जावा का उपयोग करने का एक और तरीका है

 import java.io._ def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) { val p = new java.io.PrintWriter(f); try { op(p) } finally { p.close() } } printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}