DBInputFormat adatok átvitele SQL-ből NoSQL adatbázisba

Ennek a blognak az a célja, hogy megtanulja, hogyan kell adatokat átvinni az SQL adatbázisokból a HDFS-be, hogyan lehet adatokat átvinni az SQL adatbázisokból a NoSQL adatbázisokba.

Ebben a blogban a Hadoop technológia egyik legfontosabb összetevőjének, azaz a MapReduce képességeit és lehetőségeit vizsgáljuk meg.



Ma a vállalatok a Hadoop keretrendszert választják elsődleges választásként az adattárolásra, mivel képes a nagy adatok hatékony kezelésére. De azt is tudjuk, hogy az adatok sokoldalúak, és különböző struktúrákban és formátumokban léteznek. Az ilyen sokféle adat és annak különböző formátumainak ellenőrzéséhez olyan mechanizmusra van szükség, amely az összes fajtát befogadja, ugyanakkor hatékony és következetes eredményt hoz.



A Hadoop keretrendszer legerősebb összetevője a MapReduce, amely jobban képes biztosítani az adatok és azok struktúrájának vezérlését, mint a többi társa. Bár ehhez szükség van a tanulási görbe rezsire és a programozás komplexitására, ha kezelni tudja ezeket a bonyolultságokat, akkor bármilyen adatot biztosan kezelhet a Hadoop segítségével.

A MapReduce keretrendszer minden feldolgozási feladatát alapvetően két szakaszra bontja: Map és Reduce.



A nyers adatok előkészítése ezekhez a fázisokhoz megköveteli néhány alaposztály és interfész megértését. Az újrafeldolgozás szuper osztálya az InputFormat.

Az InputFormat osztály a Hadoop MapReduce API egyik alaposztálya. Ez az osztály két fő dolog meghatározásáért felel:

  • Az adatok szétválnak
  • Lemezolvasó

Adatok megosztása alapvető koncepció a Hadoop MapReduce keretrendszerben, amely meghatározza az egyes térképi feladatok méretét és a lehetséges végrehajtó szervert is. Az Record Reader felelős a bemeneti fájl tényleges beolvasásáért és azok (kulcs / érték párokként) beküldéséért a leképezőhöz.



A leképezők számát a felosztások száma alapján határozzuk meg. Az InputFormat feladata a felosztások létrehozása. Az idő felosztásának nagysága megegyezik a blokk méretével, de nem mindig jön létre a felosztás a HDFS blokk méret alapján. Teljesen attól függ, hogy az InputFormat getSplits () metódusát hogyan írták felül.

Alapvető különbség van az MR felosztás és a HDFS blokk között. A blokk egy fizikai adatrész, míg a felosztás csak egy logikai darab, amelyet a térképkészítő olvas. A felosztás nem tartalmazza a bemeneti adatokat, csak az adatok hivatkozását vagy címét tartalmazza. A felosztásnak alapvetően két dolga van: a bájtok hossza és a tárolási helyek összessége, amelyek csak karakterláncok.

Hogy jobban megértsük, vegyünk egy példát: A MySQL-ben tárolt adatok feldolgozása MR használatával. Mivel ebben az esetben nincs blokk fogalom, az elmélet: „a hasításokat mindig a HDFS blokk alapján hozzák létre”,nem sikerül. Az egyik lehetőség az, hogy osztásokat hoz létre a MySQL tábla sortartományai alapján (és ezt teszi a DBInputFormat, egy bemeneti formátum az adatok relációs adatbázisokból történő olvasásához). Előfordulhat, hogy k száma osztódik, amelyek n sorból állnak.

Csak a FileInputFormat (InputFormat a fájlokban tárolt adatok kezeléséhez) alapú InputFormats esetében hozhatók létre a felosztások a bemeneti fájlok teljes mérete, bájtokban, alapján. A bemeneti fájlok FileSystem blokkméretét azonban a bemeneti felosztás felső határaként kezeljük. Ha a HDFS blokkméretnél kisebb fájlja van, akkor csak 1 leképezőt kap a fájlhoz. Ha más viselkedést szeretne, használhatja a mapred.min.split.size fájlt. De ez megint csak az InputFormat getSplits () függvénye.

Annyi már létező beviteli formátum áll rendelkezésre, amely az org.apache.hadoop.mapreduce.lib.input csomag alatt érhető el.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

menj a c ++ - ra

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

mi a keret a java-ban

TextInputFormat.html

Az alapértelmezett érték a TextInputFormat.

Hasonlóképpen, annyi kimeneti formátumunk van, amely beolvassa a reduktorok adatait és HDFS-be tárolja:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Alapértelmezésként a TextOutputFormat.

Mire befejezné a blog olvasását, megtanulta volna:

  • Hogyan lehet térképcsökkentő programot írni
  • A Mapreduce-ban elérhető különböző típusú InputFormats-ról
  • Mi az InputFormats szükséglete
  • Az egyéni InputFormats írása
  • Hogyan lehet adatokat átvinni az SQL adatbázisokból a HDFS-be
  • Hogyan lehet adatokat átvinni az SQL (itt MySQL) adatbázisokból a NoSQL adatbázisokba (itt Hbase)
  • Hogyan lehet adatokat átvinni egy SQL-adatbázisból az SQL-adatbázisok másik táblájába (Lehet, hogy ez nem annyira fontos, ha ezt ugyanabban az SQL-adatbázisban csináljuk. Ugyanakkor nincs semmi baj abban, hogy ismerjük ugyanezt. Soha nem lehet tudni hogyan kerülhet használatba)

Előfeltétel:

  • Hadoop előre telepítve
  • SQL előre telepítve
  • A Hbase előre telepítve
  • Java alapismeretek
  • MapReduce ismeretek
  • Hadoop keretrendszer alapismeretek

Értsük meg a problémamegállapítást, amelyet itt megoldunk:

Van egy alkalmazott táblánk a MySQL DB-ben az Edureka relációs adatbázisunkban. Az üzleti követelményeknek megfelelően a relációs DB-ben rendelkezésre álló összes adatot át kell helyeznünk Hadoop fájlrendszerre, azaz HDFS-re, NoSQL DB-re, Hbase néven.

Számos lehetőségünk van erre a feladatra:

  • Sqoop
  • Flume
  • MapReduce

Most nem kíván más eszközt telepíteni és konfigurálni ehhez a művelethez. Csak egy lehetőség marad, amely a Hadoop MapReduce feldolgozási keretrendszere. A MapReduce keretrendszer teljes körű ellenőrzést biztosít az adatok felett az átvitel során. Az oszlopokat manipulálhatja, és közvetlenül a két megcélzott helyre teheti.

Jegyzet:

hogyan kell kezelni a felugró szelént
  • Töltsük le és tegyük a MySQL csatlakozót a Hadoop osztályútvonalába, hogy táblákat lehozzunk a MySQL táblából. Ehhez töltse le a com.mysql.jdbc_5.1.5.jar csatlakozót, és tartsa a Hadoop_home / share / Hadoop / MaPreduce / lib könyvtárban.
cp Letöltések / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Ezenkívül tegye az összes Hbase edényt a Hadoop osztályút alá, hogy MR programja hozzáférjen a Hbase fájlhoz. Ehhez hajtsa végre a következő parancsot :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

A feladat végrehajtásához használt szoftververziók a következők:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Holdfogyatkozás

Annak elkerülése érdekében, hogy a program bármilyen kompatibilitási probléma esetén elkerülhető legyen, előírom az olvasóimnak, hogy futtassák a parancsot hasonló környezettel.

Egyéni DBInputWritable:

com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implementates Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) dob IOException {} public void readFields (ResultSet rs) throws SQLException // A Resultset objektum az SQL utasításból visszaküldött adatokat ábrázolja {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) dobja az IOException { } public void write (PreparedStatement ps) dobja az SQLException {ps.setInt (1, id) ps.setString (2, név) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} nyilvános karakterlánc getDept () {return dept}}

Egyéni DBOutputWritable:

com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implementates Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) dobja az IOException {} public void readFields (ResultSet rs) dobja az SQLException {} public void write (DataOutput out) dobja az IOException {} public void write (PreparedStatement) ps) dobja az SQLException {ps.setString (1, név) ps.setInt (2, id) ps.setString (3, dept)}}

Beviteli táblázat:

adatbázis létrehozása edureka
tábla emp létrehozása (empid int not null, varchar név (30), varchar dept (20), elsődleges kulcs (empid))
illessze be az emp értékekbe (1, 'abhay', 'fejlődés'), (2, 'brundesh', 'teszt')
válassza ki az * em

1. eset: Átvitel a MySQL-ről a HDFS-re

com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) kivételt dob ​​{Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // illesztőprogram osztály' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // felhasználónév' root ') // jelszó Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormset.utormFutPathPathPath.utform). új elérési út (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // beviteli tábla neve null, null, új karakterlánc [] {'empid', 'name', 'dept'} / / tábla oszlopok) Útvonal p = új útvonal (args [0]) FileSystem fs = FileSystem.get (új URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ez a kóddarab előkészítheti vagy konfigurálhatja az inputformátumot az SQL SQL forrásunk eléréséhez. A paraméter tartalmazza az illesztőprogram osztályt, az URL tartalmazza az SQL adatbázis címét, felhasználónevét és jelszavát.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // illesztőprogram osztály 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // felhasználónév 'root') //Jelszó

Ez a kóddarab lehetővé teszi, hogy átadjuk az adatbázisban lévő táblázatok részleteit, és beállítsuk a jobobjektumba. A paraméterek természetesen tartalmazzák a jobpéldányt, az egyéni írható osztályt, amelynek meg kell valósítania a DBWritable felületet, a forrástábla nevét, a feltétel esetleges null értékét, az egyéb rendezési paramétereket null nullát, illetve a táblázat oszlopainak listáját.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid', 'name', 'dept'} // táblázat oszlopai)

Mapper

com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map kiterjeszti a Mappert {
védett érvénytelen térkép (LongWritable kulcs, DBInputWritable érték, Context ctx) {próbálkozzon {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (új szöveg (név + '' + id + '' + oszt.), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reduktor: Identity Reducer Használt

Parancs futtatásra:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Kimenet: MySQL tábla átkerül a HDFS-be

hadoop dfs -ls / dbtohdfs / *

2. eset: Áthelyezés a MySQL egyik táblázatából a másikba a MySQL-ben

kimeneti tábla létrehozása a MySQL-ben

tábla alkalmazott1 létrehozása (név varchar (20), id int, dept varchar (20))

com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) kivételt dob ​​{Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // felhasználónév' root ') // jelszó job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // beviteli táblázat neve null, null, új karakterlánc [] {'empid ',' név ',' dept '} // táblázat oszlopai) DBOutputFormat.setOutput (job,' worker1 ', // output table name new String [] {' név ',' id ',' dept '} // tábla oszlopok) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ez a kóddarab konfigurálhatja a kimeneti tábla nevét az SQL DB-ben. A paraméterek a jobpéldány, a kimeneti tábla neve és a kimeneti oszlopok neve.

DBOutputFormat.setOutput (job, 'worker1', // output table name new String [] {'név', 'id', 'dept'} // táblázat oszlopai)

Mapper: Ugyanaz, mint az 1. eset

Csökkentő:

com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce kiterjeszti Reducer {védett void csökkentés (szövegkulcs, Iterable értékek, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (sor [0] .toString (), Integer.parseInt (sor [1] .toString ()), [2] sor .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Parancs a futtatásra:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Kimenet: Adatok átvitele a MySQL EMP táblázatából a MySQL másik tábla alkalmazottjához1

3. eset: Áthelyezés a MySQL táblázatából a NoSQL (Hbase) táblába

Hbase tábla létrehozása az SQL tábla kimenetének elhelyezéséhez:

'alkalmazott', 'hivatalos_info' létrehozása

Vezető osztály:

csomag Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] argumentum) kivételt dob ​​{Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // felhasználónév 'root') // jelszó job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ( 'alkalmazott', Reduce.class, munka) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (TableOutputFormat. osztály) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // beviteli tábla neve null, null, new String [] {'empid', 'name', 'dept'} // táblázat oszlopai) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ez a kóddarab lehetővé teszi a kimeneti kulcsosztály konfigurálását, amely hbase esetén ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Itt adjuk át a hbase tábla nevét és a reduktort, hogy az asztalon működjön.

TableMapReduceUtil.initTableReducerJob ('alkalmazott', Reduce.class, job)

Mapper:

csomag Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map kiterjeszti a Mapper {private IntWritable one = új IntWritable (1) védett érvénytelen térképet (LongWritable id, DBInputWritable value, Context context) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (új ImmutableBytesWritable (Bytes.toBytes (cd)), új szöveg (line + ') '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Ebben a kódrészletben értékeket veszünk a DBinputwritable osztály gettereiből, majd továbbadjuk őket
ImmutableBytesWritable úgy, hogy a Hbase által megértett bytewriatble formában érjék el a reduktort.

Karakterlánc sor = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (új ImmutableBytesWritable (Bytes.toBytes (cd)), új szöveg (line + '+ + dept ))

Csökkentő:

csomag Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce extends TableReducer {public void reduc (ImmutableBytesWritable key, Iterable values, Context context) dobja az IOException, InterruptedException {String [] oka = null // Loop értékek mert (Szöveg val: értékek) {okoz = val.toString (). split ('')} // HBase-be tesz Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('név'), Bytes.toBytes (ok [0])) put.add (Bytes.toBytes ('hivatalos_info'), Bytes.toBytes ('osztály'), Bytes.toBytes (ok [1 ])) context.write (kulcs, betét)}}

Ez a kóddarab lehetővé teszi, hogy eldöntsük a pontos sort és azt az oszlopot, amelyben az értékeket tároljuk a szűkítőből. Itt minden egyes emidet külön sorban tárolunk, mivel az empid-et sorkulcsként készítettük, amely egyedi lenne. Minden sorban a „hivatalos_info” oszlopcsaládban az alkalmazottak hivatalos adatait tároljuk a „név” és az „osztály” oszlopok alatt.

Put put = new Put (kulcs.get ()) put.add (Bájt.Bájt ('hivatalos_info'), Bájt.Bájt ('név'), Bájt.Bájt (ok [0])) put.add (Bájt. toBytes ('hivatalos_info'), Bytes.toBytes ('osztály'), Bytes.toBytes (ok [1])) context.write (kulcs, put)

Átadott adatok a Hbase-ben:

beolvasási alkalmazott

Amint látjuk, sikeresen elvégezhettük üzleti adatok migrációját egy relációs SQL DB-ről NoSQL DB-re.

A következő blogban megtudhatjuk, hogyan kell kódokat írni és végrehajtani más bemeneti és kimeneti formátumokhoz.

Tegye közzé észrevételeit, kérdéseit vagy bármilyen visszajelzését. Szívesen hallanék tőled.

Van egy kérdésünk? Kérjük, említse meg a megjegyzések részben, és kapcsolatba lépünk Önnel.

Kapcsolódó hozzászólások: