Tuesday, February 8, 2011

DataStage Parallel Processing

Posted by Venkat ♥ Duvvuri 8:30 AM, under | No comments

Following figure represents one of the simplest jobs you could have — a data source,
a Transformer (conversion) stage, and the data target. The links between the stages represent the flow of data into or out of a stage. In a parallel job, each stage would normally (but not always) correspond to a process. You can have multiple instances of each process to run on the available processors in your system.


A parallel DataStage job incorporates two basic types of parallel processing — pipeline and partitioning. Both of these methods are used at runtime by the Information Server engine to execute the simple job shown in Figure 1-8. To the DataStage developer, this job would appear the same on your Designer canvas, but you can optimize it through advanced properties.

Pipeline parallelism
In the following example, all stages run concurrently, even in a single-node configuration. As data is read from the Oracle source, it is passed to the Transformer stage for transformation, where it is then passed to the DB2 target. Instead of waiting for all source data to be read, as soon as the source data stream starts to produce rows, these are passed to the subsequent stages. This method is called pipeline parallelism, and all three stages in our example operate simultaneously regardless of the degree of parallelism of the configuration file. The Information Server Engine always executes jobs with pipeline parallelism.

If you ran the example job on a system with multiple processors, the stage reading would start on one processor and start filling a pipeline with the data it had read. The transformer stage would start running as soon as there was data in the pipeline, process it and start filling another pipeline. The stage writing the transformed data to the target database would similarly start writing as soon as there was data available. Thus all three stages are operating simultaneously.


Partition parallelism
When large volumes of data are involved, you can use the power of parallel processing to your best advantage by partitioning the data into a number of separate sets, with each partition being handled by a separate instance of the job stages. Partition parallelism is accomplished at runtime, instead of a manual process that would be required by traditional systems.

The DataStage developer only needs to specify the algorithm to partition the data, not the degree of parallelism or where the job will execute. Using partition parallelism the same job would effectively be run simultaneously by several processors, each handling a separate subset of the total data. At the end of the job the data partitions can be collected back together again and written to a single data source. This is shown in following figure.


Attention: You do not need multiple processors to run in parallel. A single processor is capable of running multiple concurrent processes.


Partition parallelism [Combining pipeline and partition parallelism]
The Information Server engine combines pipeline and partition parallel processing to achieve even greater performance gains. In this scenario you would have stages processing partitioned data and filling pipelines so the next one could start on that partition before the previous one had finished. This is shown in the following figure.


In some circumstances you might want to actually re-partition your data between stages. This could happen, for example, where you want to group data differently. Suppose that you have initially processed data based on customer last name, but now you want to process on data grouped by zip code. You will have to re-partition to ensure that all customers sharing the same zip code are in the same group. DataStage allows you to re-partition between stages as and when necessary. With the Information Server engine, re-partitioning happens in memory between stages, instead of writing to disk.

Thursday, February 3, 2011

DataStage Best Practices

Posted by Venkat ♥ Duvvuri 8:57 PM, under | No comments

This section provides an overview of recommendations for standard practices.
The recommendations are categorized as follows:
* Standards
* Development guidelines
* Component usage
* DataStage Data Types
* Partitioning data
* Collecting data
* Sorting
* Stage specific guidelines

Standards

It is important to establish and follow consistent standards in:
* Directory structures for installation and application support directories.
* Naming conventions, especially for DataStage Project categories, stage names, and links.
All DataStage jobs should be documented with the Short Description field, as well as Annotation fields.
It is the DataStage developer’s responsibility to make personal backups of their work on their local workstation, using DataStage's DSX export capability. This can also be used for integration with source code control systems.
Note: A detailed discussion of these practices is beyond the scope of this Redbooks publication, and you should speak to your Account Executive to engage IBM IPS Services.

Development guidelines

Modular development techniques should be used to maximize re-use of DataStage jobs and components:
* Job parameterization allows a single job design to process similar logic instead of creating multiple copies of the same job. The Multiple-Instance job property allows multiple invocations of the same job to run simultaneously.
* A set of standard job parameters should be used in DataStage jobs for source and target database parameters (DSN, user, password, etc.) and directories where files are stored. To ease re-use, these standard parameters and settings should be made part of a Designer Job Parameter Sets.
* Create a standard directory structure outside of the DataStage project directory for source and target files, intermediate work files, and so forth.
* Where possible, create re-usable components such as parallel shared containers to encapsulate frequently-used logic.
* DataStage Template jobs should be created with:
– Standard parameters such as source and target file paths, and database login properties
– Environment variables and their default settings
– Annotation blocks
* Job Parameters should always be used for file paths, file names, database login settings.
* Standardized Error Handling routines should be followed to capture errors and rejects.

Component usage

The following guidelines should be followed when constructing parallel jobs in IBM InfoSphere DataStage Enterprise Edition:
* Never use Server Edition components (BASIC Transformer, Server Shared Containers) within a parallel job. BASIC Routines are appropriate only for job control sequences.
* Always use parallel Data Sets for intermediate storage between jobs unless that specific data also needs to be shared with other applications.
* Use the Copy stage as a placeholder for iterative design, and to facilitate default type conversions.
* Use the parallel Transformer stage (not the BASIC Transformer) instead of the Filter or Switch stages.
Chapter 1. IBM InfoSphere DataStage overview 29
* Use BuildOp stages only when logic cannot be implemented in the parallel Transformer.

DataStage data types

The following guidelines should be followed with DataStage data types:
* Be aware of the mapping between DataStage (SQL) data types and the internal DS/EE data types. If possible, import table definitions for source databases using the Orchestrate Schema Importer (orchdbutil) utility.
* Leverage default type conversions using the Copy stage or across the Output mapping tab of other stages.

Partitioning data

In most cases, the default partitioning method (Auto) is appropriate. With Auto partitioning, the Information Server Engine will choose the type of partitioning at runtime based on stage requirements, degree of parallelism, and source and target systems. While Auto partitioning will generally give correct results, it might not give optimized performance. As the job developer, you have visibility into requirements, and can optimize within a job and across job flows. Given the numerous options for keyless and keyed partitioning, the following objectives form a methodology for assigning partitioning:

Objective 1
Choose a partitioning method that gives close to an equal number of rows in each partition, while minimizing overhead. This ensures that the processing workload is evenly balanced, minimizing overall run time.

Objective 2
The partition method must match the business requirements and stage functional requirements, assigning related records to the same partition if required. Any stage that processes groups of related records (generally using one or more key columns) must be partitioned using a keyed partition method. This includes, but is not limited to: Aggregator, Change Capture, Change Apply, Join, Merge, Remove Duplicates, and Sort stages. It might also be necessary for Transformers and BuildOps that process groups of related records.

Objective 3
Unless partition distribution is highly skewed, minimize re-partitioning, specially in cluster or Grid configurations. Re-partitioning data in a cluster or Grid configuration incurs the overhead of network transport.

Objective 4
Partition method should not be overly complex. The simplest method that meets the above objectives will generally be the most efficient and yield the best performance. Using the above objectives as a guide, the following methodology can be
applied:
a. Start with Auto partitioning (the default).
b. Specify Hash partitioning for stages that require groups of related records
as follows:
• Specify only the key column(s) that are necessary for correct grouping as long as the number of unique values is sufficient
• Use Modulus partitioning if the grouping is on a single integer key column
• Use Range partitioning if the data is highly skewed and the key column values and distribution do not change significantly over time (Range Map can be reused)
c. If grouping is not required, use Round Robin partitioning to redistribute data equally across all partitions.
• Especially useful if the input Data Set is highly skewed or sequential
d. Use Same partitioning to optimize end-to-end partitioning and to minimize re-partitioning
• Be mindful that Same partitioning retains the degree of parallelism of the upstream stage
• Within a flow, examine up-stream partitioning and sort order and attempt to preserve for down-stream processing. This may require re-examining key column usage within stages and re-ordering stages within a flow (if business requirements permit).
Note: In satisfying the requirements of this second objective, it might not be possible to choose a partitioning method that gives an almost equal number of rows in each partition.

Across jobs, persistent Data Sets can be used to retain the partitioning and sort
order. This is particularly useful if downstream jobs are run with the same degree
of parallelism (configuration file) and require the same partition and sort order.


Collecting data

Given the options for collecting data into a sequential stream, the following guidelines form a methodology for choosing the appropriate collector type:
1. When output order does not matter, use Auto partitioning (the default).
2. Consider how the input Data Set has been sorted:
– When the input Data Set has been sorted in parallel, use Sort Merge collector to produce a single, globally sorted stream of rows.
– When the input Data Set has been sorted in parallel and Range partitioned, the Ordered collector might be more efficient.
3. Use a Round Robin collector to reconstruct rows in input order for round-robin partitioned input Data Sets, as long as the Data Set has not been re-partitioned or reduced.

Sorting

Apply the following methodology when sorting in an IBM InfoSphere DataStage Enterprise Edition data flow:
1. Start with a link sort.
2. Specify only necessary key column(s).
3. Do not use Stable Sort unless needed.
4. Use a stand-alone Sort stage instead of a Link sort for options that are not available on a Link sort:
– The “Restrict Memory Usage” option should be included here. If you want more memory available for the sort, you can only set that via the Sort Stage — not on a sort link. The environment variable $APT_TSORT_STRESS_BLOCKSIZE can also be used to set sort memory usage (in MB) per partition.
– Sort Key Mode, Create Cluster Key Change Column, Create Key Change Column, Output Statistics.
– Always specify “DataStage” Sort Utility for standalone Sort stages.
– Use the “Sort Key Mode=Don’t Sort (Previously Sorted)” to resort a sub-grouping of a previously-sorted input Data Set.
5. Be aware of automatically-inserted sorts:
– Set $APT_SORT_INSERTION_CHECK_ONLY to verify but not establish required sort order.
6. Minimize the use of sorts within a job flow.
7. To generate a single, sequential ordered result set, use a parallel Sort and a
Sort Merge collector.


Stage specific guidelines

The guidelines by stage are as follows:

Transformer
Take precautions when using expressions or derivations on nullable columns within the parallel Transformer:
– Always convert nullable columns to in-band values before using them in an expression or derivation.
– Always place a reject link on a parallel Transformer to capture / audit possible rejects.

Lookup
It is most appropriate when reference data is small enough to fit into available shared memory. If the Data Sets are larger than available memory resources, use the Join or Merge stage. Limit the use of database Sparse Lookups to scenarios where the number of input rows is significantly smaller (for example 1:100 or more) than the number of reference rows, or when exception processing.

Join
Be particularly careful to observe the nullability properties for input links to any form of Outer Join. Even if the source data is not nullable, the non-key columns must be defined as nullable in the Join stage input in order to identify unmatched records.

Aggregators
Use Hash method Aggregators only when the number of distinct key column values is small. A Sort method Aggregator should be used when the number of distinct key values is large or unknown.

Database Stages
The following guidelines apply to database stages:
– Where possible, use the Connector stages or native parallel database stages for maximum performance and scalability.
– The ODBC Connector and ODBC Enterprise stages should only be used when a native parallel stage is not available for the given source or target database.
– When using Oracle, DB2, or Informix databases, use Orchestrate Schema Importer (orchdbutil) to properly import design metadata.
– Take care to observe the data type mappings.
– If possible, use an SQL where clause to limit the number of rows sent to a
DataStage job.
– Avoid the use of database stored procedures on a per-row basis within a high-volume data flow. For maximum scalability and parallel performance, it is best to implement business rules natively using DataStage parallel components.