Spark – debug spark code faster

  1. Use count() to call actions on intermediary RDDs/Dataframes.

    While it’s great that Spark follows a lazy computation model so it doesn’t compute anything until necessary, the downside is that when you do get an error, it may not be clear exactly where the error in your code appeared. Therefore, you’ll want to factor your code such that you can store intermediary RDDs / Dataframes as a variable. When debugging, you should call count() on your RDDs / Dataframes to see what stage your error occurred. This is a useful tip not just for errors, but even for optimizing the performance of your Spark jobs. It will allow you to measure the running time of each individual stage and optimize them.

  2. Working around bad input.

    When working with large datasets, you will have bad input that is malformed or not as you would expect it. I recommend being proactive about deciding for your use case, whether you can drop any bad input, or you want to try fixing and recovering, or otherwise investigating why your input data is bad. A filter command is a great way to get only your good input points or your bad input data (If you want to look into that more and debug). If you want to fix your input data or to drop it if you cannot, then using a flatMap() operation is a great way to accomplish that.

  3. Use the debugging tools in Databricks notebooks.

  4. Use the debugging tools in Databricks notebooks.

    The Databricks notebook is the most effective tool in Spark code development and debugging. When you compile code into a JAR and then submit it to a Spark cluster, your whole data pipeline becomes a bit of a black box that is slow to iterate on. The notebooks allow you to isolate and find the parts of your data pipeline that are buggy or slow, and it also allows you to quickly try different fixes. In Databricks, there are number of additional built-in features to make debugging very easy

  5. Commenting

    Other users in your organization can comment on your code and suggest improvements. You could even do code reviews directly within notebooks or just share comments on the features.

  6. Version Control

    Databricks notebooks have two different types of version control. The first is the traditional method of syncing your notebooks directly into GitHub.

The other is a history of what your notebook looked like at previous points in time, and allows you to revert to an older version with the click of a button.

Condensed view of the number of partitions.

When are you running a Spark Job, you can drill down and see the jobs and stages needed to run your job and how far along they are. In the workload below, for Stage 11, there are 200 partitions, 42 have completed, and 8 are currently running. If this stage were really slow, a larger Spark cluster would allow you to run more of the partitions at once and make the overall job finish faster.

Spark UI Popout

If you click on the “View” link above for the job, the whole Spark UI will pop up for you to debug with. In tip #4, we’ll cover the Spark UI. We did a blog on this feature, check it out for more details.

 

Tip 4: Understanding how to debug with the Databricks Spark UI.

The Spark UI contains a wealth of information you can use for debugging your Spark jobs. There are a bunch of great visualizations, and we have a blog post here about those features.

Generally, I find the Spark UI intuitive to use. The only thing I see is that sometimes if a job fails, users will only look at the error that is thrown up to the cell in the notebook. When you have a Spark stage with a ton of tasks, if even a single task consistently fails, your whole job will fail. So, I advise drilling all the way down to the task page, sorting your page by the status, and examining the “Errors” column for the tasks that have failed. You’ll get a detailed error message there.

Tip 5: Scale up Spark jobs slowly for really large datasets.

If you have a really large dataset to analyze and run into errors, you may want to try debugging/testing on a portion of your dataset first. And then when you get that running smoothly, go back to the full dataset. Having a smaller dataset makes it quick to reproduce any errors, understand the characteristics of your dataset during various stages of your data pipeline, and more. Note – you can definitely run into more problems when you run the larger dataset – the hope is just that if you can reproduce the error at a smaller scale, it’s easier for you to fix than if you needed the full dataset.

Tip 6: Reproduce errors or slow Spark jobs using Databricks Jobs.

As with any bug, having a reliable reproduction of the bug is half the effort of solving the bug. For that, I recommending reproducing errors and slow Spark jobs using the Databricks Jobsfeature. This will help you capture the conditions for the bug/slowness and understand how flakey the bug is. You’ll also capture the output permanently to look at – including the running times of each individual cell, the output of each cell, and any error message. And since our jobs feature contains a history UI, you can view any log files and the Spark UI even after your cluster has been shut down. You can also experiment with different Spark versions, instances types, cluster sizes, alternate ways to write your code, etc. in a very controlled environment to figure out how they affect your Spark job.

 

Tip 7: Examine the partitioning for your dataset.

While Spark chooses good reasonable defaults for your data, if your Spark job runs out of memory or runs slowly, bad partitioning could be at fault. For me, I start with trying different partitioning sizes to see how they affect your job.

If your dataset is large, you can try repartitioning to a larger number to allow more parallelism on your job. A good indication of this is if in the Spark UI – you don’t have a lot of tasks, but each task is very slow to complete.

myDF.repartition(100)

On the other hand, if you don’t have that much data and you have a ton of partitions, the overhead of the having too many partitions can also cause your job to be slow. You can repartition to a smaller number, but coalesce may be faster since that will try to combine partitions on the same machines rather than shuffle your data around again.

myDF.coalesce(10)

If you are using Spark SQL, you can set the partition for shuffle steps by setting spark.sql.shuffle.partitions.

%sql set spark.sql.shuffle.partitions=1000

 

 

 

 

 

 

 

cassandra – key components for configuring

Gossip

A peer-to-peer communication protocol to discover and share location and state information about the other nodes in a Cassandra cluster. Gossip information is also persisted locally by each node to use immediately when a node restarts.

Partitioner

A partitioner determines which node will receive the first replica of a piece of data, and how to distribute other replicas across other nodes in the cluster.

Each row of data is uniquely identified by a primary key, which may be the same as its partition key, but which may also include other clustering columns.

A partitioner is a hash function that derives a token from the primary key of a row. The partitioner uses the token value to determine which nodes in the cluster receive the replicas of that row.

The Murmur3Partitioner is the default partitioning strategy for new Cassandra clusters and the right choice for new clusters in almost all cases.

You must set the partitioner and assign the node a num_tokens value for each node.The number of tokens you assign depends on the hardware capabilities of the system. If not using virtual nodes (vnodes), use the initial_token setting instead.

Replication factor

All replicas are equally important; there is no primary or master replica. You define the replication factor for each datacenter. Generally you should set the replication strategy greater than one, but no more than the number of nodes in the cluster.

Replica placement strategy Cassandra stores copies (replicas) of data on multiple nodes to ensure reliability and fault tolerance. A replication strategy determines which nodes to place replicas on. The first replica of data is simply the first copy; it is not unique in any sense. The NetworkTopologyStrategy is highly recommended for most deployments.

When creating a keyspace, you must define the replica placement strategy and the number of replicas you want.

SnitchA snitch defines groups of machines into datacenters and racks (the topology) that the replication strategy uses to place replicas.

You must configure a snitch when you create a cluster. All snitches use a dynamic snitch layer, which monitors performance and chooses the best replica for reading. It is enabled by default and recommended for use in most deployments. Configure dynamic snitch thresholds for each node in the cassandra.yaml configuration file.The GossipingPropertyFileSnitch is recommended for production.

The cassandra.yaml configuration file The main configuration file for setting the initialization properties for a cluster, caching parameters for tables, properties for tuning and resource utilization, timeout settings, client connections, backups, and security.

By default, a node is configured to store the data it manages in a directory set in the cassandra.yamlfile.

In a production cluster deployment, you can change the commitlog-directory to a different disk drive from the data_file_directories.

System keyspace table properties You set storage configuration attributes on a per-keyspace or per-table basis programmatically or using a client application, such as CQL.

cassandra – key structures

  • Node

    Where you store your data. It is the basic infrastructure component of Cassandra.

  • datacenter

    A collection of related nodes. A datacenter can be a physical datacenter or virtual datacenter. Different workloads should use separate datacenters, either physical or virtual. Replication is set by datacenter. Using separate datacenters prevents Cassandra transactions from being impacted by other workloads and keeps requests close to each other for lower latency. Depending on the replication factor, data can be written to multiple datacenters. datacenters must never span physical locations.

  • Cluster

    A cluster contains one or more datacenters. It can span physical locations.

  • Commit log

    All data is written first to the commit log for durability. After all its data has been flushed to SSTables, it can be archived, deleted, or recycled.

  • SSTable

    A sorted string table (SSTable) is an immutable data file to which Cassandra writes memtables periodically. SSTables are append only and stored on disk sequentially and maintained for each Cassandra table.

  • CQL Table

    A collection of ordered columns fetched by table row. A table consists of columns and has a primary key.

Cassandra – Architecture

Cassandra addresses the problem of failures by employing a peer-to-peer distributed system across homogeneous nodes where data is distributed among all nodes in the cluster.

  • Each node frequently exchanges state information about itself and other nodes across the cluster using peer-to-peer gossip communication protocol.
  • A sequentially written commit log on each node captures write activity to ensure data durability.
  • Data is then indexed and written to an in-memory structure, called a memtable, which resembles a write-back cache.
  • Each time the memory structure is full, the data is written to disk in an SSTables data file.
  • All writes are automatically partitioned and replicated throughout the cluster.
  • Cassandra periodically consolidates SSTables using a process called compaction, discarding obsolete data marked for deletion with a tombstone.
  • To ensure all data across the cluster stays consistent, various repair mechanisms are employed.
  • Cassandra is a partitioned row store database, where rows are organized into tables with a required primary key.
  • Cassandra’s architecture allows any authorized user to connect to any node in any datacenter and access data using the CQL language.
  • For ease of use, CQL uses a similar syntax to SQL and works with table data. Developers can access CQL through cqlsh, DevCenter, and via drivers for application languages. Typically, a cluster has one keyspace per application composed of many different tables.
  • Client read or write requests can be sent to any node in the cluster.
  • When a client connects to a node with a request, that node serves as the coordinator for that particular client operation.
  • The coordinator acts as a proxy between the client application and the nodes that own the data being requested. The coordinator determines which nodes in the ring should get the request based on how the cluster is configured.

Amazon – Streams, Producers, and Consumers

To create a stream

  1. Open the Amazon Kinesis console at https://console.aws.amazon.com/kinesis.
  2. Choose Go to Streams.
  3. In the navigation bar, expand the region selector and choose a region.
  4. Choose Create Stream.
  5. Type a name for your stream (for example, StockTradeStream).
  6. Type 1 for the number of shards, but leave Estimate the number of shards you’ll need collapsed.
  7. Choose Create.

Create IAM Policy and User

Producer

Actions Resource Purpose
DescribeStream Amazon Kinesis stream Before attempting to write records, the producer should check if the stream exists and is active.
PutRecord, PutRecords Amazon Kinesis stream Write records to Streams.

Consumer

Actions Resource Purpose
DescribeStream Amazon Kinesis stream Before attempting to read records, the consumer checks if the stream exists and is active, and if the shards are contained in the stream.
GetRecords, GetShardIterator Amazon Kinesis stream Read records from a Streams shard.
CreateTable, DescribeTable,GetItem, PutItem,PutMetricData, Scan, UpdateItem Amazon DynamoDB table If the consumer is developed using the Kinesis Client Library (KCL), it needs permissions to a DynamoDB table to track the processing state of the application. The first consumer started creates the table.
DeleteItem Amazon DynamoDB table For when the consumer performs split/merge operations on Streams shards.
PutMetricData Amazon CloudWatch log The KCL also uploads metrics to CloudWatch, which are useful for monitoring the application.

 

To create an IAM policy

  1. Determine the Amazon Resource Name (ARN) for the new stream. The ARN format is as follows:
    arn:aws:kinesis:region:account:stream/name
    region
    The region code; for example, us-west-2. For more information, see Region and Availability Zone Concepts.

    account
    The AWS account ID, as shown in Account Settings.

    name
    The name of the stream from Step 1: Create a Stream, which is StockTradeStream.

    1. Choose Select next to Policy Generator.
    2. Choose Amazon Kinesis as the AWS service.
    3. Select DescribeStream, GetShardIterator, GetRecords, PutRecord, and PutRecords as the allowed actions.
    4. Type the ARN that you created in Step 1.
    5. Use Add Statement for each of the following:
      AWS Service Actions ARN
      Amazon DynamoDB CreateTable, DeleteItem,DescribeTable, GetItem,PutItem, PutMetricData,Scan, UpdateItem The ARN you created in Step 2
      Amazon CloudWatch PutMetricData *

      The asterisk (*) is used when specifying an ARN is not required. In this case, it’s because there is no specific resource in CloudWatch on which the PutMetricData action is invoked.

 

 

 

 

Amazon – Kinesis Streams Producers -Using Kinesis Streams API with the AWS SDK for Java

PutRecords Example

The following code creates 100 data records with sequential partition keys and puts them in a stream called DataStream.

AmazonKinesisClient amazonKinesisClient = new AmazonKinesisClient(credentialsProvider);
PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
putRecordsRequest.setStreamName("DataStream");
List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
for (int i = 0; i < 100; i++) {
    PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
    putRecordsRequestEntryList.add(putRecordsRequestEntry); 
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult  = amazonKinesisClient.putRecords(putRecordsRequest);
System.out.println("Put Result" + putRecordsResult);

Amazon – Kinesis Streams Producers Using – Kinesis Producer Library

Role of the KPL

The KPL is an easy-to-use, highly configurable library that helps you write to a Amazon Kinesis stream. It acts as an intermediary between your producer application code and the Streams API actions. The KPL performs the following primary tasks:

  • Writes to one or more Amazon Kinesis streams with an automatic and configurable retry mechanism
  • Collects records and uses PutRecords to write multiple records to multiple shards per request
  • Aggregates user records to increase payload size and improve throughput
  • Integrates seamlessly with the Amazon Kinesis Client Library (KCL) to de-aggregate batched records on the consumer
  • Submits Amazon CloudWatch metrics on your behalf to provide visibility into producer performance

Note that the KPL is different from the Streams API that is available in the AWS SDKs. The Streams API helps you manage many aspects of Streams (including creating streams, resharding, and putting and getting records), while the KPL provides a layer of abstraction specifically for ingesting data.

amazon kinesis – describe stream

After the createStreamRequest object is configured, create a stream by calling the createStream method on the client.

After calling createStream, wait for the stream to reach the ACTIVE state before performing any operations on the stream. To check the state of the stream, call the describeStream method.

 

client.createStream( createStreamRequest );
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();
long endTime = startTime + ( 10 * 60 * 1000 );
while ( System.currentTimeMillis() < endTime ) {
  try {
    Thread.sleep(20 * 1000);
  } 
  catch ( Exception e ) {}
  
  try {
    DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );
    String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
    if ( streamStatus.equals( "ACTIVE" ) ) {
      break;
    }
    //
    // sleep for one second
    //
    try {
      Thread.sleep( 1000 );
    }
    catch ( Exception e ) {}
  }
  catch ( ResourceNotFoundException e ) {}
}
if ( System.currentTimeMillis() >= endTime ) {
  throw new RuntimeException( "Stream " + myStreamName + " never went active" );
}

Amazon – Kinesis – create stream client and stream

creates the Streams client and sets the endpoint information for the client. 
This overload of setEndpoint also includes the service name and region. 
Set serviceName to kinesis.

client = new AmazonKinesisClient();
client.setEndpoint(endpoint, serviceName, regionId);


Amazon API Gateway

Region Name Region Endpoint Protocol
US East (N. Virginia) us-east-1 apigateway.us-east-1.amazonaws.com HTTPS

 

 

Amazon Kinesis Streams

Region Name Region Endpoint Protocol
US East (N. Virginia) us-east-1 kinesis.us-east-1.amazonaws.com HTTPS

Create the Stream

Now that you have created your Streams client, you can create a stream to work with, which you can accomplish with the Streams console, or programmatically. To create a stream programmatically, instantiate a CreateStreamRequest object and specify a name for the stream and the number of shards for the stream to use.

CreateStreamRequest createStreamRequest = new CreateStreamRequest();
createStreamRequest.setStreamName( myStreamName );
createStreamRequest.setShardCount( myStreamSize );