This project has retired. For details please refer to its Attic page.
Falcon - Falcon Data Import and Export

Falcon Data Import and Export

Overview

Falcon provides constructs to periodically bring raw data from external data sources (like databases, drop boxes etc) onto Hadoop and push derived data computed on Hadoop onto external data sources.

As of this release, Falcon only supports Relational Databases (e.g. Oracle, MySQL etc) via JDBC as external data source. The future releases will add support for other external data sources.

Prerequisites

Following are the prerequisites to import external data from and export to databases.

  • Sqoop 1.4.6+
  • Oozie 4.2.0+
  • Appropriate database connector

Note: Falcon uses Sqoop for import/export operation. Sqoop will require appropriate database driver to connect to the relational database. Please refer to the Sqoop documentation for any Sqoop related question. Please make sure the database driver jar is copied into oozie share lib for Sqoop.

For example, in order to import and export with MySQL, please make sure the latest MySQL connector
*mysql-connector-java-5.1.31.jar+* is copied into oozie's Sqoop share lib

/user/oozie/share/lib/{lib-dir}/sqoop/mysql-connector-java-5.1.31.jar+

where {lib-dir} value varies in oozie deployments.


Usage

Entity Definition and Setup

  • Datasource Entity
Datasource entity abstracts connection and credential details to external data sources. The Datasource entity supports read and write interfaces with specific credentials. The default credential will be used if the read or write interface does not have its own credentials. In general, the Datasource entity will be defined by system administrator. Please refer to datasource XSD for more details.

The following example defines a Datasource entity for a MySQL database. The import operation will use the read interface with url "jdbc:mysql://dbhost/test", user name "import_usr" and password text "sqoop". Where as, the export operation will use the write interface with url "jdbc:mysql://dbhost/test" with user name "export_usr" and password specified in a HDFS file at the location "/user/ambari-qa/password-store/password_write_user".

The default credential specified will be used if either the read or write interface does not provide its own credentials. The default credential specifies the password using password alias feature available via hadoop credential functionality. User will be able to create a password alias using "hadoop credential -create <alias> -provider <provider-path>" command, where <alias> is a string and <provider-path> is a HDFS jceks file. During runtime, the specified alias will be used to look up the password stored encrypted in the jceks hdfs file specified under the providerPath element.

The available read and write interfaces enable database administrators to segregate read and write workloads.


      File: mysql-database.xml

      <?xml version="1.0" encoding="UTF-8"?>
      <datasource colo="west-coast" description="MySQL database on west coast" type="mysql" name="mysql-db" xmlns="uri:falcon:datasource:0.1">
          <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
          <interfaces>
              <!-- ***** read interface ***** -->
              <interface type="readonly" endpoint="jdbc:mysql://dbhost/test">
                  <credential type="password-text">
                      <userName>import_usr</userName>
                      <passwordText>sqoop</passwordFile>
                  </credential>
              </interface>

              <!-- ***** write interface ***** -->
              <interface type="write"  endpoint="jdbc:mysql://dbhost/test">
                  <credential type="password-file">
                      <userName>export_usr</userName>
                      <passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile>
                  </credential>
              </interface>

              <!-- *** default credential *** -->
              <credential type="password-alias">
                <userName>sqoop2_user</userName>
                <passwordAlias>
                    <alias>sqoop.password.alias</alias>
                    <providerPath>hdfs://namenode:8020/user/ambari-qa/sqoop_password.jceks</providerPath>
                </passwordAlias>
              </credential>

          </interfaces>

          <driver>
              <clazz>com.mysql.jdbc.Driver</clazz>
              <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/mysql-connector-java-5.1.31</jar>
          </driver>
      </datasource>
      

  • Feed Entity
Feed entity now enables users to define IMPORT and EXPORT policies in addition to RETENTION and REPLICATION. The IMPORT and EXPORT policies will refer to a already defined Datasource entity for connection and credential details and take a table name from the policy to operate on. Please refer to feed entity XSD for details.

The following example defines a Feed entity with IMPORT and EXPORT policies. Both the IMPORT and EXPORT operations refer to a datasource entity "mysql-db". The IMPORT operation will use the read interface and credentials while the EXPORT operation will use the write interface and credentials. A feed instance is created every 1 hour since the frequency of the Feed is hour(1) and the Feed instances are deleted after 90 days because of the retention policy.

The feed's data location should have some combination of time partitions (like ${YEAR}, ${MONTH}, {$DAY}, ${HOUR}, ${MINUTE} etc) if import or export policy is associated.


      File: customer_email_feed.xml

      <?xml version="1.0" encoding="UTF-8"?>
      <!--
       A feed representing Hourly customer email data retained for 90 days
       -->
      <feed description="Raw customer email feed" name="customer_feed" xmlns="uri:falcon:feed:0.1">
          <tags>externalSystem=USWestEmailServers,classification=secure</tags>
          <groups>DataImportPipeline</groups>
          <frequency>hours(1)</frequency>
          <late-arrival cut-off="hours(4)"/>
          <clusters>
              <cluster name="primaryCluster" type="source">
                  <validity start="2015-12-15T00:00Z" end="2016-03-31T00:00Z"/>
                  <retention limit="days(90)" action="delete"/>
                  <import>
                      <source name="mysql-db" tableName="simple">
                          <extract type="full">
                              <mergepolicy>snapshot</mergepolicy>
                          </extract>
                          <fields>
                              <includes>
                                  <field>id</field>
                                  <field>name</field>
                              </includes>
                          </fields>
                      </source>
                      <arguments>
                          <argument name="--split-by" value="id"/>
                          <argument name="--num-mappers" value="2"/>
                      </arguments>
                  </import>
                  <export>
                        <target name="mysql-db" tableName="simple_export">
                            <load type="insert"/>
                            <fields>
                              <includes>
                                <field>id</field>
                                <field>name</field>
                              </includes>
                            </fields>
                        </target>
                        <arguments>
                             <argument name="--update-key" value="id"/>
                        </arguments>
                    </export>
              </cluster>
          </clusters>

          <locations>
              <location type="data" path="/user/ambari-qa/falcon/demo/primary/importfeed/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}"/>
              <location type="stats" path="/none"/>
              <location type="meta" path="/none"/>
          </locations>

          <ACL owner="ambari-qa" group="users" permission="0755"/>
          <schema location="/none" provider="none"/>

      </feed>
      

  • Import policy
The import policy uses the datasource entity specified in the "source" to connect to the database. The tableName specified should exist in the source datasource.

Extraction type specifies whether to pull data from external datasource "full" everytime or "incrementally". The mergepolicy specifies how to organize (snapshot or append, i.e time series partiitons) the data on hadoop. The valid combinations are:

    • [full,snapshot] - data is extracted in full and dumped into the feed instance location.
    • [incremental, append] - data is extracted incrementally using the key specified in the deltacolumn
and added as a partition to the feed instance location.
    • [incremental, snapshot] - data is extracted incrementally and merged with already existing data on hadoop to
produce one latest feed instance.*This feature is not supported currently*. The use case for this feature is to efficiently import very large dimention tables that have updates and inserts onto hadoop and make it available as a snapshot with latest updates to consumers.

The following example defines an incremental extraction with append organization:

           <import>
                <source name="mysql-db" tableName="simple">
                    <extract type="incremental">
                        <deltacolumn>modified_time</deltacolumn>
                        <mergepolicy>append</mergepolicy>
                    </extract>
                    <fields>
                        <includes>
                            <field>id</field>
                            <field>name</field>
                        </includes>
                    </fields>
                </source>
                <arguments>
                    <argument name="--split-by" value="id"/>
                    <argument name="--num-mappers" value="2"/>
                </arguments>
            </import>
        

The fields option enables to control what fields get imported. By default, all fields get import. The "includes" option brings only those fields specified. The "excludes" option brings all the fields other than specified.

The arguments section enables to pass in any extra arguments needed for fine control on the underlying implementation -- in this case, Sqoop.

  • Export policy
The export, like import, uses the datasource for connecting to the database. Load type specifies whether to insert or only update data onto the external table. Fields option behaves the same way as in import policy. The tableName specified should exist in the external datasource.

Operation

Once the Datasource and Feed entity with import and export policies are defined, Users can submit and schedule the Import and Export operations via CLI and REST API as below:


    ## submit the mysql-db datasource defined in the file mysql_datasource.xml
    falcon entity -submit -type datasource -file mysql_datasource.xml

    ## submit the customer_feed specified in the customer_email_feed.xml
    falcon entity -submit -type feed -file customer_email_feed.xml

    ## schedule the customer_feed
    falcon entity -schedule -type feed -name customer_feed

   

Falcon will create corresponding oozie bundles with coordinator and workflow for import and export operation.