Hadoop and Spark Installation on Raspberry Pi-3 Cluster – Part-2

Cluster Architecture

  • Master Node will be connected to home router via WiFi
  • Master Node to Slave Node connection will be established through Ethernet switch via Ethernet Cables
  • From my MAC (which will be on my home network), I will be able to SSH to the master node and then control the whole cluster

For Spark/Hadoop Cluster, there are few more TODOs that we need to take care of

  • Update /etc/hosts on every node (master and slave) with hostname and ip_address of every other node
  • Use same super user and group to do all installations on every node
  • Enable SSH on every node and establish passwordless SSH communication from Master to every Slave node.
  • Install zip/unzip and java on every node.

In this part we will see SINGLE NODE SETUP. I will be using MAC to perform all these steps.

Step #1: Load Raspbian Pi image onto the MicroSD Card

  • Download SD Formatter from https://www.sdcard.org/downloads/formatter_4/eula_mac/index.html
  • Format the disk (follow steps from google)
  • Download the Raspbian_Lite OS from https://www.raspberrypi.org/ and follow these instructions
    • diskutil list (this will list the newly added disk in my case it was /dev/disk4)
    • diskutil unmountDisk /dev/disk4
    • sudo dd bs=1m if=~/Downloads/2017-01-11-raspbian-jessie-lite.img of=/dev/rdisk4

Step #2: Configure the PI, connect to WiFi and upgrade all latest patches.

  • sudo raspi-config
    • Change pwd
    • Localization options (change locale to us_en, timezone to US-Eastern and wifi country to US)
    • Advanced Options – Mem Split from 64 to 16(becos of rasbian_lite OS will take very less footprint and it does not have any UI)
    • Interfacing Options – Enable SSH
  • sudo vi /etc/network/interfaces
    • Ethernet eth0 is the wired connection which we will be using
    • wlan0 is the wifi adapter on the board: Configure wifi so that we can use wifi to get updates on the Raspbian Lite OS
    • change manual to dhcp => this tells the interface that we get the settings via dhcp AND add SSID and PWD of home router
      • change the line “iface wlan0 inet manual” to
 
iface wlan0 inet dhcp
    wpa-ssid "SSID/NETWORK_NAME"
    wpa-psk "PASSWORD"
 
  •  Reboot the interface
    • sudo ifdown wlan0
    • sudo ifup wlan0
  • Now Raspberry PI is connected to WIFI
  • Update and Upgrade the Raspberry PI
    • sudo apt-get update
    • sudo apt-get upgrade

Step #3: Create separate superuser and group

  • We will use this user and group for all our core installations and configuration changes on all nodes
 
sudo addgroup hadoop
sudo adduser --ingroup hadoop hduser
sudo adduser hduser sudo
su hduser
 

Step #4: Download and Install REQUIRED SOFTWARES

  • Download and install zip utility
    • sudo apt-get install zip unzip
  • Download and install java
    • sudo apt-get install oracle-java7-jdk
  • Download, install and configure Spark
wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
    sudo tar -xvzf spark-2.1.0-bin-hadoop2.7.tgz -C /opt/
    sudo chown -R hduser /opt/spark-2.1.0-bin-hadoop2.7
    source ~/.bashrc
    cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh  
        SPARK_MASTER_IP=192.168.2.1
        SPARK_WORKER_MEMORY=512m
  • Download, install and configure Hadoop
    wget http://apache.osuosl.org/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz
    sudo mkdir /opt
    cd ~
    sudo tar -xvzf hadoop-2.7.1.tar.gz -C /opt/
    cd /opt
    sudo chown -R hduser:hadoop hadoop-2.7.1/
    sudo nano /opt/hadoop-2.7.3/etc/hadoop/
    export JAVA_HOME=/usr/lib/jvm/jdk-7-oracle-arm-vfp-hflt/jre
    
  • sudo nano /opt/hadoop-2.7.3/etc/hadoop/hdfs-site.xml
     
            <property>
                    <name>dfs.replication</name>
                    <value>1</value>
            </property>
    
  • sudo nano /opt/hadoop-2.7.3/etc/hadoop/core-site.xml
     
            <property>
                    <name>fs.default.name</name>
                    <value>hdfs://rpi3-0:54310</value>
            </property>
            <property>
                    <name>hadoop.tmp.dir</name>
                    <value>/hdfs/tmp</value>
            </property>
    
  • sudo nano /opt/hadoop-2.7.3/etc/hadoop/mapred-site.xml
     
      <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
      </property>
      <property>
        <name>mapreduce.map.memory.mb</name>
        <value>256</value>
      </property>
      <property>
        <name>mapreduce.map.java.opts</name>
        <value>-Xmx204m</value>
      </property>
      <property>
        <name>mapreduce.reduce.memory.mb</name>
        <value>102</value>
      </property>
      <property>
        <name>mapreduce.reduce.java.opts</name>
        <value>-Xmx102m</value>
      </property>
      <property>
        <name>yarn.app.mapreduce.am.resource.mb</name>
        <value>128</value>
      </property>
      <property>
        <name>yarn.app.mapreduce.am.command-opts</name>
        <value>-Xmx102m</value>
      </property>
    
  • sudo nano /opt/hadoop-2.7.3/etc/hadoop/yarn-site.xml
     
            <property>
                    <name>yarn.resourcemanager.resource-tracker.address</name>
                    <value>rpi3-0:8025</value>
            </property>
            <property>
                    <name>yarn.resourcemanager.scheduler.address</name>
                    <value>rpi3-0:8030</value>
            </property>
            <property>
                    <name>yarn.resourcemanager.address</name>
                    <value>rpi3-0:8050</value>
            </property>
            <property>
                    <name>yarn.nodemanager.aux-services</name>
                    <value>mapreduce_shuffle</value>
            </property>
            <property>
                    <name>yarn.nodemanager.resource.cpu-vcores</name>
                    <value>4</value>
            </property>
            <property>
                    <name>yarn.nodemanager.resource.memory-mb</name>
                    <value>1024</value>
            </property>
            <property>
                    <name>yarn.scheduler.minimum-allocation-mb</name>
                    <value>128</value>
            </property>
            <property>
                    <name>yarn.scheduler.maximum-allocation-mb</name>
                    <value>1024</value>
            </property>
            <property>
                    <name>yarn.scheduler.minimum-allocation-vcores</name>
                    <value>1</value>
            </property>
            <property>
                    <name>yarn.scheduler.maximum-allocation-vcores</name>
                    <value>4</value>
            </property>
            <property>
                    <name>yarn.nodemanager.vmem-check-enabled</name>
                    <value>false</value>
            </property>
            <property>
                    <name>yarn.nodemanager.pmem-check-enabled</name>
                    <value>true</value>
            </property>
            <property>
                    <name>yarn.nodemanager.vmem-pmem-ratio</name>
                    <value>4</value>
            </property>
    
  • Add environment variables to the bashrc file
    sudo nano ~/.bashrc

    export JAVA_HOME=/usr/lib/jvm/jdk-7-oracle-arm-vfp-hflt/jre
    export HADOOP_HOME=/opt/hadoop-2.7.3
    export HADOOP_MAPRED_HOME=$HADOOP_HOME
    export HADOOP_COMMON_HOME=$HADOOP_HOME
    export HADOOP_HDFS_HOME=$HADOOP_HOME
    export YARN_HOME=$HADOOP_HOME
    export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
    export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    
    export SPARK_HOME=/opt/spark-2.1.0-bin-hadoop2.7 
    export PATH=$PATH:$SPARK_HOME/bin  
    

Step #5: Create image from SD Card and clone it to all other PIs on the cluster

  • Switch off the Raspberry Pi-3 and take the sd-card out and plug it into mac
  • Run the below commands
    • diskutil list
    • sudo dd if=/dev/disk4 of=~/Downloads/raspberrypi_base_with_hdp.dmg
  • On other nodes
    • diskutil unmountDisk /dev/disk4
    • sudo dd bs=1m if=~/Downloads/raspberrypi_base_with_hdp.dmg of=/dev/rdisk4

References:
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html
https://dqydj.com/raspberry-pi-hadoop-cluster-apache-spark-yarn/

Hadoop and Spark Installation on Raspberry Pi-3 Cluster – Part-1

In this part we will go through the hardware configurations of the Raspberyy PI-3 4 Node Cluster setup.

Hardware Configuration


raspi3

 

  1. Raspberry Pi 3 Model B 2016 Single Board Computer – 4 = 120 (bought from Microcentre)

 

sd_card

 

2. Samsung EVO 32GB Class 10 Micro SDHC Card with Adapter (MB-MP32DA/AM) – 4 (12) = $48

 

ethernet_cables

 

3. Cable Matters 8-Pack, Cat5E Snagless Ethernet Patch Cable in Blue 3 Feet – 4 = $10

 

 

 

 

8port_ethernet_switch

 

4. TRENDnet 8-Port Unmanaged 10/100 Mbps GREENnet Ethernet Desktop Plastic Housing Switch,TE100-S8 – 1 = $15

 

usb_cables

 

5. Anker [4-Pack] PowerLine Micro USB (1ft) – 1 = $10

 

usb2typem_cable

 

6. StarTech.com USB2TYPEM 3 Feet USB to Type M Barrel 5V DC Power Cable – 1 = $4

 

nylon_standoffs

 

7. 100 Pcs M2.5 x 10mm + 6mm PC Board Hexagonal Hex Threaded Spacer – 1 = $14

 

usb_switch8. ORICO DUB-10P-WH 96W 10 Ports Family-Sized Smart Super USB Charger with 10 × 5V 2.4A Port – White – 1 = $20

 

machine_screws

 

 

9. Machine Screws (bought from Home Depot) – 2 = under $2

 

 

 

10. Thin card board from Home Depot – $4

Total Budget under $250

Except Machine Screws, Card Board and RaspberryPis all above things are bought in Amazon and NewEgg.

Additionally you can buy heat sinks and fans based on the load and extent you are going to use this cluster.

My personal cluster setup Steps:

  • Drill 4 holes on to the card board as per the holes available on the Raspberry Pi
  • Stack two nylon standoffs together form 20 such sets
  • Insert screws from below the cardboard and 4 stacked nylon standoffs from above the cardboard
  • Keep stacking Raspberry Pis (RPIs) one by one and tighten the standoffs.
  • Connect 4 micro USB cables from RPIs to the USB switch
  • Connect 4 ethernet cables from RPIs to the Ethernet switch
  • Connect Ethernet switch to the USB switch via USB2TYPEM cable.
  • Insert 32G samsung sd card into each of the RPIs
  • Switch on the USB switch and the entire cluster will be powered.

cluster_1 cluster_2

 

Important YARN configuration properties

To configure YARN and MapReduce on top of YARN, we should look into couple of configuration files

  • yarn-site.xml
  • mapred-site.xml

yarn-site.xml

  • yarn.scheduler.minimum-allocation-mb: The minimum allocation for every container request at the RM
  • yarn.scheduler.maximum-allocation-mb: The maximum allocation for every container request at the RM
  • yarn.scheduler.minimum-allocation-vcores: The minimum allocation for every container request at the RM, in terms of virtual CPU cores.
  • yarn.scheduler.maximum-allocation-vcores: The maximum allocation for every container request at the RM, in terms of virtual CPU cores.
  • yarn.nodemanager.resource.memory-mb: Amount of physical memory, that can be allocated for containers. Total RAM on a given node that can be utilized by the node manager to create the containers
  • yarn.nodemanager.resource.cpu-vcores: Number of vcores that can be allocated for containers. This is used by the RM scheduler when allocating resources for containers. This is not used to limit the number of physical cores used by YARN containers.
  • yarn.nodemanager.pmem-check-enabled: Whether physical memory limits will be enforced for containers.
  • yarn.nodemanager.vmem-check-enabled: Whether virtual memory limits will be enforced for containers.
  • yarn.nodemanager.vmem-pmem-ratio: Ratio between virtual memory to physical memory when setting memory limits for containers. Container allocations are expressed in terms of physical memory, and virtual memory usage is allowed to exceed this allocation by this ratio.

Virtual Memory: physical + paged memory

Reference:
https://hadoop.apache.org/docs/r2.7.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml
http://hortonworks.com/blog/how-to-plan-and-configure-yarn-in-hdp-2-0/

mapred-site.xml

  • mapreduce.framework.name: yarn
  • mapreduce.map.memory.mb: The amount of memory to request from the YARN scheduler for each map task. This is total physical RAM of a Map Task Container.
  • mapreduce.map.java.opts: The JVM Heap Size (0.8 times above RAM), so that JVM memory is within the container physical memory
  • mapreduce.reduce.memory.mb: The amount of memory to request from the YARN scheduler for each reduce task.
  • yarn.app.mapreduce.am.resource.mb: The amount of memory the MR AppMaster needs.

Reference:
https://hadoop.apache.org/docs/r2.7.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml

Hadoop 2.x Architecture

Addressing the limitations of the hadoop1.x, hadoop2.x is designed in a generic way so as to accommodate more than just Hadoop Mapreduce (MR V1) so that the hadoop cluster can also be used for spark, kafka, storm, and other distributed frameworks. Hadoop2.x is also referred to as as YARN – Yet Another Resource Negotiator OR MapReduce V2.

Drawing parallel between Hadoop1.x and Hadoop2.x

  • Job tracker is split into Resource Manager and Application Master
  • Task Tracker is split into Node Manager and Containers

Components of MR V2

  • Resource Manager (one per cluster): It does the job of pure scheduling, i.e, identifying the resources across the cluster and assigning them to the competing applications. It has two modules in it.
    • Scheduler: This is pluggable and we can change it based on requirement like capacity scheduler or fairness scheduler etc
    • Application Masters (plural): This is responsible for restarting the Application Master in case of its failure.

Note: Resource Manager failure itself can be handled with ZooKeeper.

  • Application Master (one per application/job): This is responsible to negotiating resources with the Resource Manager and once it gets the resources (containers inside slave nodes), it talks to the Node Managers and get the tasks executed by tracking their status and progress. The major benefits this component brings to the architecture
    • Scalability: By splitting the resource management/scheduling from application life cycle management, the cluster can be scaled much more than MR V1.
    • Generic Architecture: By moving all the application specific runtime complexities into the Application Master, we will be able to launch any type of frameworks like Kafka, Storm, Spark, etc
      Application Master also supports very generic resource model, like it can request very specific resources like amount of RAM requried, number of cores, etc
  • Node Manager (one per Slave Node): This is responsible for launching containers inside the node with the amount of resources as specified by Application Master.YARN allows applications (AM) to launch any process (unlike only java in MR V1) with
    • command to launch as process within container
    • environment variables required for the process
    • local resources required prior to launch on that node (like any 3rd party jars etc)
      security tokens (if any)
  • Container (one/many per node): This is the lowest level slave node component used to process the data.

Hadoop 2.x/YARN/MR V2 Architecture

hadoop2_arch

 

Application Flow

#1: Client submits job to the cluster which is received by the Resource Manager
#2: Resource Manager (RM) launches the Application Master in one of the available containers (here container #C5)
#3: On Application Master (AM) bootup, it registers itself with RM and then requests for the resources to run the job on cluster
#4: RM responds to AM with 2 containers #C1 and #C8
#5: AM then asks Node Managers (NM) of #C1 and #C8 to launch the containers
#6: Containers while running the job report back to the AM
#7: While all this process is going on Client has direct communication with AM to get the status of the job
##: Once the job is complete, AM unregisters itself from the RM and thus making the container available for other jobs

Hadoop 1.x Architecture and Drawbacks

Hadoop is built on two whitepapers published by Google, i.e,

  • HDFS
  • Map Reduce

HDFS: Hadoop Distributed File System

It is different from the normal file system in a way that the data copied on to HDFS is split into ‘n’ blocks and each block is copied on to a different node in the cluster. To achieve this we use master-slave architecture

  • HDFS Master => Name Node: Takes the client request and responsible for orchestrating the data copy across the cluster
  • HDFS Slave => Data Node: Actually saves the block of data and coordinates with its master

MapReduce: This is the processing engine and is also implemented in master-slave architecture.

  • MR Master => Job Tracker: Takes the incoming jobs, identifies the available resources across the cluster, divides the job into tasks and submits it to the cluster
  • MR Slave => Task Tracker: Actually runs the task and coordinates with its master.

Architecture

hadoop1_arch

Drawbacks

  • Design of JobTracker is done in such a way that its tightly coupled with two important responsibilities “Resource Management” and “MapReduce Task Execution”. Because of this reason the cluster cannot be used for other distributed computing technologies like Spark/Kafka/Storm/… other than Hadoop MapReduce
  • Name Node can maintain metadata of upto 4000-5000 data nodes at maximum. This will limit the cluster scalability to 4k-5k nodes
  • Hard partition of slot into Mapper and Reducer slots
  • JobTracker was a Single Point Of Failure SPOF
  • Iterative applications (Machine Learning) are very slow (10x times slower than YARN)
  • Lack of wire compatible protocols between client and sever in MapReduce applications (like hive and pig where they can support multiple versions on the same cluster)

Addressing these drawbacks hadoop 2.x is released.

role of a software architect

As an architect we should always focus on these areas

  • Envision: Software Architecture
  • Model: Design Pattern
  • Blueprint: Construction and Design Principles
  • Inspect: Construction Patterns
  • Nomenclature: Architect Jargon to communicate intent.

Software Architecture

  • Web Server? Pipelined Architectures
  • Cloud? N-Tier Network Architecture
  • SOA? Component Architecture
  • Client/Server? Layered Architecture
  • Single Tier? Monolithic Architecture

Design Pattern

  • Creational, Structural and Behavioral Patterns
    • Creational DP:provide us a way to decouple the client from the objects that it needs to instantiate
      Singleton, Factory, Builder
    • Structural DP: help us to compose classes or objects into larger structures
      Decorator, Facade, Adapter, Proxy
    • Behavioral DP: dictates how classes and objects interact with each other and distribute the responsibilities
      Strategy, Template, Iterator
  • Plugins: Plugins enable you to execute code in response to certain events

Design Principles

  • How does Encapsulation affects code reuse? Because it hides complex code which can easily be replaced with another code.
  • How cohesion affects layers? High cohesion leads to better layered design
  • How coupling affects scalability? Low coupling leads to more scalability

Construction Principles

  • Favor composition(component design) over inheritance as it reduces number of dependencies between the modules which increases flexibility.
    Layered and Tier Design?
  • Whats your developement methodology? 3-5 week sprint based on type of project.
  • How to avoid architecture erosion? Do periodic code reviews, build unit/integration tests and do a nightly builds

Architect Jargon

  • CAP Theorem: Consistency Avaliability Partition-Tolerance
  • ACID Properties: Atomicity Consistency Isolation Durability
  • Difference between a Component and a Module? A component is fine grained selft contained entity that can interact with remaining parts of system (like Data Access Layer), whereas module is coarse grained deployable source code bundle which shares a common purpose (like JSON Parser, Log4j, etc), replacing it would not impact overall system architecture
  • Difference between Tier and Layer? Tier is a physical unit, where the code / process runs. E.g.: client, application server, database server; Layer is a logical unit, how to organize the code. E.g.: presentation (view), controller, models, repository, data access.
  • Cohesion and Coupling: Cohesion is the degree to which the elements of a certain module belong together. Coupling is the degree of interdependence between software modules

References:
https://www.youtube.com/watch?v=t2Ti-pZGy8I
https://nofluffjuststuff.com/n/training/2017/02/20/software_architecture_training

Important aggregations in spark

Three main aggregations

  • reduceByKey(): It has internal combiner, used when aggregation in the data is high. Its used only when INTERMEDIATE/COMBINER aggregation logic is same as that of FINAL/REDUCER AGGREGATION logic
  • aggregateByKey(): Its similar to reduceByKey(). It has internal custom combiner. This is used to initialize some default value
  • combineByKey(): Its similar to reduceByKey(). It also has internal custom combiner. This is used to initialize dynamic value (by reading the input record and have some logic in place to initialize)

Comparision

  • aggregateByKey() and reduceByKey() are sub types of combineByKey()
  • In aggregateByKey() and combineByKey(), TYPE of INPUT value need not be same as that of the OUTPUT
  • If we want to use custom logic in combiner than we go for aggregateByKey() or combineByKey() and in reduceByKey(), the combiner logic will be same as that of reducer.

Other important aggregations:
– groupByKey(): Used when combiner is not required, and hence its used when there are not many aggregations on the dataset. It provides much more flexibility for complex operations than other aggregations.
– countByKey(): Unlike all the above methods which are transformations, this is an action

Sqoop cheat sheet

Here we will discuss all possible sqoop command line options to import and export data between HDFS and RDBMS, import/export delimiters, incremental load and sqoop job/merge operations.

For practice, I downloaded the cloudera VM from http://www.cloudera.com/downloads/quickstart_vms/5-8.html

Anytime during this exercise, if you need help on sqoop queries, use sqoop help option
$sqoop --help
$sqoop import --help


Import  into HDFS – Database level operations


— list databases
$ sqoop list-databases --connect "jdbc:mysql://quickstart.cloudera:3306" --username retail_dba --password cloudera

— import all tables from db to HDFS
sqoop import-all-tables -m 12 --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --as-textfile --warehouse-dir=/user/cloudera/sqoop_import/
Formats: supported are avro, text and binary
–as-textfile, –as-avrodatafile, –as-sequencefile
-m or –num-mappers: Used to define number of threads per table
BoundingValsQuery: Used to figure out number of buckets based on number of mappers.

— Import all tables from rdbms with compression and hive table creation
$sqoop import-all-tables \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username retail_dba \
> --password cloudera \
> --hive-import \
> --hive-overwrite \
> --create-hive-table \
> --hive-database sqoop_import \
> --compress \
> --compression-codec org.apache.hadoop.io.compress.SnappyCodec \
> --outdir java_files

compress and comression-codec: is used to compress ingested files
out-dir: is used to store some sqoop internal java files
–hive-import and create-hive-table: used to import into hive warehouse and create hive tables on ingeated tables
–hive-overwrite – overwrites the data in existing table, if not mentioned then it will append to the existing data in the table


Import into HDFS – Table level operations


— Import a single table from sqoop
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --target-dir /user/cloudera/departments
–table: mention table name
–target-dir: location where table data is copied
Note: If ‘-m’ option is not given then default number of mappers=4
Note: For every table import sqoop will use min and max of primary key (in boundingvalquery) and divide the records into number of buckets as specified
* Disadv: with above query is that if there are some outliers in the data then data will be unevently spread across mappers with some mappers taking heavy load and some less load

— overwrite boundary query to redefine the distribution
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --target-dir /user/cloudera/departments --boundary-query "select min(department_id), max(department_id) from departments where department_id <> 8000"

— import specific columns from a table
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --target-dir /user/cloudera/departments --boundary-query "select min(department_id), max(department_id) from departments where department_id <> 8000" --columns department_id,department_name

— import a table using specific query
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --target-dir /user/cloudera/departments --boundary-query "select min(department_id), max(department_id) from departments where department_id <> 8000" --columns department_id,department_name --query "select * from departments"
* –query and –table are mutually exclusive

— import a table without primary key
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments_nopk --target-dir /user/cloudera/departments
* This will error out as sqoop cannot split the records if there is no promary key. In this case we have to give either ‘-m 1’ or ‘–split-by ‘
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments_nopk --target-dir /user/cloudera/departments -m 1
OR
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments_nopk --target-dir /user/cloudera/departments --split-by department_id

— import data by joining the source table
$ sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --query "select * from orders join order_items on orders.order_id=order_items.order_item_order_id where \$CONDITIONS" --split-by order_id --target-dir /user/cloudera/order_join --where "orders.order_id <> 0"
* –table-name cannot be given with –query
* $CONDITIONS is required because sqoop qill append conditions from –where otherwise ‘true’ (if no condition given)
* –splity-by is given because there is no primary_key on the joined dataset

— import into HIVE Tables
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --hive-home /user/hive/warehouse --hive-import --hive-overwrite --hive-table sqoop_import.departments
* –hive-home is optional as it is the default value
* –hive-table should include db name followed by table name OR include –hive-database to have dbname separate
* There are two ways to import data into hive tables, one is to create the table and then import into the existing table via –hive-table(above query), and other option is to create table while importing itself via –create-hive-table
* Hive import will first download data into the temp dir (i.e, home dir of user /user/cloudera/) and then loads into the hive table, hence make sure the dir with the table name is deleted in your home directory


Incremental Load


* In Incremental Loads – Before importing we connect to log table or log file to check for the delta condition (using sqoop eval or IO API) and then do import and update the log table/file after import is successfull so that next incremental/delta load can look at it
* Incremental Load can be done in two ways – One is using –where argument and other option is to use out of the box incremental options –incremental, –check-column and –last-value

#Option-1
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --append --target-dir /user/cloudera/sqoop_import/departments/ --where "department_id > 7"
* –append and –where works togeather in incremental loads. If –append not given then it will error out

#Option-2
$ sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --append --target-dir /user/cloudera/sqoop_import/departments/ --check-column department_id --incremental append --last-value 7
–append is req in this case as well
–check-column : columns against which delta is evaluated
–last-value: last values from where data has to be imported
–incremental: append/lastmodified
* –incremental: append – Used when there are only inserts into the the sql table (NO UPDATES)
* –incremental: lastmodified – Used when there are inserts and updates to the SQL table. For this to use we should have date column in the table and –last-value should be the timestamp


Export data to a MySQL database from HDFS using Sqoop 


— Export HDFS data into new SQL table
sqoop export --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table order_items_export --export-dir /user/cloudera/sqoop_import/order_items
* –export-dir is option to specify external directory to load the data from hdfs into mysql table
* How number of threads/mappers work in export? In import based on number of mappers(‘-m 12’) sqoop will issue that many queries and imports data from mysql table into the cluster as RDBMS has that capability. But in export, it uses HDFS distributed data blocks to divide the blocks among the threads (‘–num-mappers 12’) and starts uploading the data

— Update/Merge HDFS data into existing SQL table
$ sqoop export --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --export-dir /user/cloudera/sqoop_import/departments_export/ --batch --update-key department_id --update-mode allowinsert
* –update-key is the primary_key/unique_key against which the update will happen. There has to be a primary key on the table for the above query to work otherwise all records will be inserted (duplicate records). If there is composite key then give comma separated columns
* –update-mode : updateonly/allowinsert
updateonly – It updates the existing record/s and DOES NOT insert new record (DEFAULT MODE), all new records will be ignored. So without passing –update-mode argument, records can only be updated but new records cannot be inserted.
allowinsert – It can updates existing records and also inserts new records
* Without –update-key and –update-mode, it works only as insert mode.


Change the delimiter and file format of data during import using Sqoop


— Change import delimiters on plain HDFS dir
$ sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --target-dir /user/cloudera/departments_enclosed --enclosed-by \" --fields-terminated-by \| --lines-terminated-by \\n --escaped-by \, --null-string \\N --null-non-string -1
* –enclosed-by: It encloses every field in the data with this character
* –escaped-by: Used to escape any special characters in the data (like , in csv can cause issue with total number of cols in a record)
* –fields-terminated-by: field separater
* –lines-terminated-by: line separater
* –null-string: Replace null in string columns
* –null-non-string: Replace null in non-string(int, double etc) columns
* Default values are Uses MySQL’s default delimiter set: fields: , lines: \n escaped-by: \ optionally-enclosed-by: ‘ [These can be used with explicit arg –mysql-delimiters or dont give any args with respect to delimiters and formats]

— Change import delimiters on hive tables
Sqoop import using –hive-import options will import the data using default hive delimiters as fields: CTRL+A and lines: \n
$sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --hive-home /user/hive/warehouse --hive-import --hive-overwrite --hive-table sqoop_import.departments_test --create-hive-table

— Change export delimiters
All the delimiters in HDFS input in export are appended with –input
* –input-enclosed-by: It encloses every field in the data with this character
* –input-escaped-by: Used to escape any special characters in the data (like , in csv can cause issue with total number of cols in a record)
* –input-fields-terminated-by: field separater
* –input-lines-terminated-by: line separater
* –input-null-string: Replace null in string columns
* –input-null-non-string: Replace null in non-string(int, double etc) columns

But if we are used non-default SQL delimiters when we imported the data and wanted to use same imported directory in export then we have to use above-to-above arguments as well as those delimiters will be stored in the out-dir (java-files) in the imported dir
$ sqoop export --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments_test --export-dir /user/hive/warehouse/sqoop_import.db/departments_test/ --input-fields-terminated-by \\001 --input-lines-terminated-by '\n' --input-null-string NULL --input-null-non-string -1

— file format of data during import
$ sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --target-dir /user/cloudera/departments --as-sequencefile
–as-sequencefile: will store data in binary format
$ sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --target-dir /user/cloudera/departments --as-avrodatafile
–as-avrodatafile will import schema into the user home dir along with the data into the target dir.

  • Schema represents the table structure, columns and datatypes. It is generated with convention sqoop_import_.avsc
    • hive> Create external table departments_avro ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.avro.AvroSerDe’ stored as inputformat ‘org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat’ outputformat ‘org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat’ location ‘/user/cloudera/departments/’ tblproperties(‘avro.schema.url’=’/user/cloudera/departments.avsc’);
  • Export have nothing to do with file formats

Sqoop Jobs and Sqoop Merge


This is used to define pre-defined job with all the required parameters for the purpose of reuse
$ sqoop job --create import_job -- import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table departments --target-dir /user/cloudera/departments
* — import \ [there should be space between — and import]

$sqoop job --list -> will list all the existing sqoop jobs
$sqoop job --show –> will show the job details and definition
$sqoop job --exec –> To run the job

— Merge
sqoop merge --merge-key department_id --new-data --new-data /user/cloudera/sqoop_merge/departments_delta --onto /user/cloudera/sqoop_merge/departments --target-dir /user/cloudera/sqoop_merge/staging --class-name departments.java --jar-file /tmp/sqoop-cloudera/compile/e11d28e872acd71c103d33fbf81ec5c7/departments.jar
* now remove the old dir ‘/user/cloudera/sqoop_merge/departments’
hdfs dfs -rm -R /user/cloudera/sqoop_merge/departments
* rename dir ‘/user/cloudera/sqoop_merge/staging’ to ‘/user/cloudera/sqoop_merge/departments’
hdfs dfs -mv /user/cloudera/sqoop_merge/staging /user/cloudera/sqoop_merge/departments

References:
https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html
https://www.youtube.com/channel/UCakdSIPsJqiOLqylgoYmwQg

Order by, Sort by, Distribute By, Cluster by in Hive

These four clauses provided by HIVE are very important in context of Big Data as they provide a way to achieve sorting on big datasets.

  • ORDER BY x: guarantees global ordering, but does this by pushing all data through just one reducer. This is basically unacceptable for large datasets. You end up one sorted file as output.
  • SORT BY x: orders data at each of N reducers, but each reducer can receive overlapping ranges of data. You end up with N or more sorted files with overlapping ranges.
  • DISTRIBUTE BY x: ensures each of N reducers gets non-overlapping ranges of x, but doesn’t sort the output of each reducer. You end up with N or unsorted files with non-overlapping ranges.
  • CLUSTER BY x: ensures each of N reducers gets non-overlapping ranges, then sorts by those ranges at the reducers. This gives you global ordering, and is the same as doing (DISTRIBUTE BY x and SORT BY x). You end up with N or more sorted files with non-overlapping ranges.

More detailed explanation is given here

References:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy
http://stackoverflow.com/questions/13715044/hive-cluster-by-vs-order-by-vs-sort-by
http://stackoverflow.com/questions/13715044/hive-cluster-by-vs-order-by-vs-sort-by

best possible way to find the duplicates in an array

This is the most frequently asked question in many interviews. This can be done in many ways and simplest way is the brute-force approach by taking 2 for loops with complexity of O(n*n). But the interviewer looks for the solution with the best time complexity and also without using any existing data structures that are available in Java or other languages.

Now I am going to show you the approach with O(n) complexity at the cost of O(n) space. Full source code can be downloaded from here.

Step1: Construct a simple linked list which holds the integer value.

    class MyInt {
		int value;
		MyInt next;
		
		public MyInt(int value) {
			this.value = value;
		}
	}

Step2: Initialize the above array with the size of the input array.

hashArray = new MyInt[size];

Step3: Loop through the input array and insert elements into the newly constructed hash array

  • In this method we construct the hashcode of each element and then find out the appropriate bucket (array index) in which the element has to be stored.
    public void findDuplicates(Integer currentElement) {
		int idx = getSupplementalHash(currentElement.hashCode()) % size;
		//int idx = (element.hashCode()) % size;
		MyInt existingElement = hashArray[idx];

		for(;existingElement != null; existingElement = existingElement.next) {
			if(existingElement.value == currentElement) {
				// duplicate
				System.out.println(currentElement+" this is a duplicate");
				return;
			} else {
				System.out.println("identified collision, adding "+
                                                   existingElement.value+" to the list");
			}
		}
		
		System.out.println("adding "+currentElement+" to the list");
		MyInt mi = new MyInt(currentElement);
		// insert element at the head to avoid tail traversing
		mi.next = hashArray[idx];
		hashArray[idx] = mi;
		
		System.out.println("------------------------------------");
	}

Example #1
Input Array: {1, 2, 3, 3, 4, 5, 6, 8, 8, 1, 2,8}
Output:

3 is a duplicate
8 is a duplicate
1 is a duplicate
2 is a duplicate
8 is a duplicate

But there is a catch here, if the hashcode is not properly implemented by the user the complexity might go upto O(n*n), see the below example

Example #2: In findDuplicates() method above, comment line #2 and uncomment line #3
Input array: {10,0,100,20,20}
Output

Input array size is 5
adding 10 to the list
------------------------------------
identified collision, adding 10 to the list
adding 0 to the list
------------------------------------
identified collision, adding 0 to the list
identified collision, adding 10 to the list
adding 100 to the list
------------------------------------
identified collision, adding 100 to the list
identified collision, adding 0 to the list
identified collision, adding 10 to the list
adding 20 to the list
------------------------------------
20 is a duplicate

If you notice above output, to find if 20 is a duplicate, it traversed 4 times which is nothing but O(n) for each element and hence it goes back to O(n*n) overall.

Disadvantage: The downside with this approach when hashcodes are not proper (like all ending with 0, all are even, all are odd, etc), they all go into one bucket and thus becoming another array (with complexity O(n)) instead or hasharray (with complexity O(1)). Because of this our target complexity O(n) will be lost.

To address this we will re-compute the hash code again at our end to make sure it is well distributes the index, here is the code

    private int getSupplementalHash(int h) {
		// This function ensures that hashCodes that differ only by
		// constant multiples at each bit position have a bounded
		// number of collisions (approximately 8 at default load factor).
		h ^= (h >>> 20) ^ (h >>> 12);
		return h ^ (h >>> 7) ^ (h >>> 4);
	}

Example #2 (run with getSupplementalHash() method): : In findDuplicates() method above, comment line #3 and uncomment line #2
Output

Input array size is 5
adding 10 to the list
------------------------------------
identified collision, adding 10 to the list
adding 0 to the list
------------------------------------
adding 100 to the list
------------------------------------
adding 20 to the list
------------------------------------
20 is a duplicate

If you notice, the collision is reduced to a greater extent.

Full source code can be downloaded from here.

Mawazo

Mostly technology with occasional sprinkling of other random thoughts

amintabar

Amir Amintabar's personal page

101 Books

Reading my way through Time Magazine's 100 Greatest Novels since 1923 (plus Ulysses)

Seek, Plunnge and more...

My words, my world...

ARRM Foundation

Do not wait for leaders; do it alone, person to person - Mother Teresa

Executive Management

An unexamined life is not worth living – Socrates

Diabolical or Smart

Nitwit, Blubber, Oddment, Tweak !!

javaproffesionals

A topnotch WordPress.com site

thehandwritinganalyst

Just another WordPress.com site

coding algorithms

"An approximate answer to the right problem is worth a good deal more than an exact answer to an approximate problem." -- John Tukey