Friday, April 19, 2024

Tuning SQL Server

Note that diagnosing the query plan in MS SQL Server is not hugely different to examing Spark query plans. Some operations are conceptually the same. 

Similarly, putting indexed columns into a function will remove any benefit they may bring just as they do for Spark's predicate pushdowns. In both, queries must be sargeable [Brent Ozar's blog]. SARGs are search arguments and "a sargable predicate is one of form (or which can be put into the form) column comparison-operator value". [SO]

Query Plans

Heap Scan

Before we look at the scan aspect, what is a heap? "A heap is a table without a clustered index... One or more nonclustered indexes can be created on tables stored as a heap."

What's the advantage of a heap? "Because data is inserted without enforcing a strict order, the insert operation is usually faster than the equivalent insert into a clustered index." 

There are many disadvantages to a heap, notably, "Do not use a heap when the data is frequently grouped together." [Microsoft]

Nested Loop Joins

"The database usually performs this matching step in a nested-loops join by indexed lookups on the join key, repeated for each row from the driving table" [1] The DB filters as it goes.

Hash Joins

"Based on table and index statistics, the cost-based optmizer estimates which of these two independent tables will retyurn fewer rows after discarding filtered rows. It chooses to has the complete results from that singe-table query...

"It then executes the larger query ... returning the driving rowset. As eadh rows exits this step, the database executes thte same hash function in its join key and uses the hash-function result to go directly to the corresponding hash bucket for the other rowset. When it reaches the right hash bucket, the database searches the tiny list of rows in that bucket to find matches."

The catch with this approach is you "hope those hash buckets fit entirely in memory, but if necessary, the database temporarily dedicates scracth disk space to hold the buckets... A large prehashed rowset could require unexpected disk scratch space, performing poorly and possibly even running out of space."

Sort-merge Joins

Spark does exactly this. This is where it "reads the two tables independently but, instead of matching rows by hashing, it presorts rowsets on the join key and merges the sorted lists... Once the two lists are sorted, the matching process is fast but presorting lists is expensive and nonrobust." [1] For this reason, hash joins are preferred. They don't have this downside but have all the same advantages.

Indices

You can see all the indexes by executing: 

select * from sys.indexes

But remember that "cost-based optimizers often do a terrible job if they do not have statistics on all the tables and indexes involved in the query. It is therefore imperative to maintain statistics on tables and indexes reliably; this includes regenerating statistics anytime table volumes change much or anytime tables or indexes are rebuilt." [1]

Clustered vs. Non Clustered Indexes

Clustered indexes actually change where rows are stored on disk. As a result, "there can be only one clustered index per table" [Microsoft].  Non-clustered indexes by contrast are just pointers to the row data.

Adding a clustered index can be an intense operation that is measured in minutes or even hours. For instance, adding a clustered index to a table of 765k rows and about 20 columns that are dates and varchars (13 columns totally a size of about 1800) takes about 15 minutes on a 12 core Azure SQL Server. But this one index reduced the TotalSubtreeCost from c. 131k to 71k.

Bit-map indexes
"Each stored value of a bit-mapped index points to what amount to a list of yes/no bits that map to the whole list of table rows. These bit strings are ways to AND and OR together with other bit stroings of other bit-mapped indexes... The big catch is that such bit strings are expensive to maintain in sync with frequently changing table contents... Bit-mapped indexes work best for tables that are mostly read-only... The best scenario for success is precisely the data-warehouse scenario for which bit-mapped indexes were designed." [1]

Columnstore

SQL Server seems to be stealing ideas from other big data tool as it now allows columnar storage. "Columnstore indexes are the standard for storing and querying large data warehousing fact tables." [Microsoft] Adding this to one of my tables made the cost drop two orders of magnitude... but the query still took over an hour before I killed it. I guess you should never judge a query by its cost [Brent Ozar].

[Aside: I eventually made the query work in a reasonable if not stellar duration by dropping a clustered index and having only the columnstore index, not the two together as I previously had.]

[1] SQL Tuning, Dan Tow

Saturday, April 6, 2024

When adding more CPUs does not help distressed CPUs

This is an interesting problem on Discourse where the symptoms belie the cause. Here, a very beefy Spark cluster is taking a long time process (admittedly) a large amount of data. However, it's the CPUs that are getting hammered. 

Insanely high CPU usage

The temptation at this point is to add more CPU resources but this won't help much.

When your Spark jobs that are not computationally intensive are using large amounts of CPU, there's an obvious suspect. Let's check time spent in Garbage Collection:


Insanely large GC Times

Shuffle per worker seems modest but look at those GC Times. In a five hours job, nearly two hours is spent just garbage collecting. 

And this is something of a surprise to people new to Spark. Sure, it delivers on its promise to process more data than can fit in memory but if you want it to be performant, you need to give it as much memory as possible.  

Friday, April 5, 2024

Network Adventures in Azure Databricks

My Azure Databricks cluster could not see one of my Blob containers although it could see others in the same subscription. The error in Databricks looked something like this: 

ExecutionError: An error occurred while calling o380.ls.
: Status code: -1 error code: null error message: java.net.SocketTimeoutException: connect timed outjava.net.SocketTimeoutException: connect timed out
        at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:423)
        at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:274)
        at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:214)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)
...

My first suspicion was that because they were in different resource groups, this could explain things.

Resource groups
"Resource groups are units of deployment in ARM [Azure Resource Manager]. 
"They are containers grouping multiple resource instances in a security and management boundary. 
"A resource group is uniquely named in a subscription. 
"Resources can be provisioned on different Azure regions and yet belong to the same resource group. 
"Resource groups provide additional services to all the resources within them. Resource groups provide metadata services, such as tagging, which enables the categorization of resources; the policy-based management of resources; RBAC; the protection of resources from accidental deletion or updates; and more... 
"They have a security boundary, and users that don't have access to a resource group cannot access resources contained within it.  Every resource instance needs to be part of a resource group; otherwise, it cannot be deployed." [Azure for Architects]
That last paragraph is interesting because I can access the container I want via the Azure portal. So, a friendly sysadmin suggested this was barking up the wrong tree and instead looked at:

Virtual Networks
"A VNet is required to host a virtual machine. It provides a secure communication mechanism between Azure resources so that they can connect to each other. 
"The VNets provide internal IP addresses to the resources, facilitate access and connectivity to other resources (including virtual machines on the same virtual network), route requests, and provide connectivity to other networks. 
"A virtual network is contained within a resource group and is hosted within a region, for example, West Europe. It cannot span multiple regions but can span all datacenters within a region, which means we can span virtual networks across multiple Availability Zones in a region. For connectivity across regions, virtual networks can be connected using VNet-to-VNet connectivity." [Azure for Architects]
Nothing obvious here. Both Databricks and the container were on the same network. However, they weren't on the same subnet.

Network Security Groups
"Subnets provide isolation within a virtual network. They can also provide a security boundary. Network security groups (NSGs) can be associated with subnets, thereby restricting or allowing specific access to IP addresses and ports. Application components with separate security and accessibility requirements should be placed within separate subnets." [Azure for Architects]
And this proved to be the problem. Databricks and the container are on the same virtual network but not the same subnet and there was an NSG blocking communication between these subnets.

Note that changes can take a few minutes to propagate, sometimes faster but sometimes slower. My sysadmin says he has seen it take up to an hour.

AWS Real Estate

Just some notes I've made playing around with AWS real estate.

ECS
Amazon's offering that scales Docker containers. Whereas EC2 is simply a remote VM, ECS is a "logical grouping of EC2 machines" [SO]

Fargate
Is a serverless version of EC2 [SO].
 
Kinesis
A propriertary Amazon Kafka replacement. While Kafka writes data locally, Kinesis uses a quorum of shards.

MSK
Amazon also offers a hosted Kafka solution called MSK (Managed Streaming for Kafka). 

Lambda
Runs containers like Docker that exists for up to 15 minutes and whose storage is ephemeral.

Glue
A little like Hive. It has crawlers that are batch jobs that compile metadata, thus doing some of the job of Hive's metastore. In fact, you can delegate the meta store that Spark uses to use Glue as its backing store. See:

EMR
EMR is AWS's MapReduce tool on which we can run Spark. "You can configure Hive to use the AWS Glue Data Catalog as its metastore." [docs] If you want to run Spark locally but still take advantage of Glue, follow these instructions.

Athena
Athena is AWS's hosted Trino offering. You can make data in S3 buckets available to Athena by using Glue crawlers.

Step Functions
AWS's orchestration of different services within Amazon's cloud.

CodePipeline
...is AWS's CI/CD offering.

Databases
DynamoDB is a key/value store and Aurora is a distributed relational DB.

Sunday, March 24, 2024

Iceberg locks and catalogs

Although Hadoop Meta Store is used for most Spark implementations, it's not recommended for Iceberg. HMS does not support retries and deconflicting commits.

"HadoopCatalog has a number of drawbacks and we strongly discourage it for production use.  There are certain features like rename and drop that may not be safe depending on the storage layer and you may also require a lock manager to get atomic behavior.  JdbcCatalog is a much better alternative for the backing catalog." [Iceberg Slack]

Iceberg comes with a DynamoDb (AWS) implementation of the lock manager. Looking at the code, it appears that acquiring the lock uses an optimistic strategy. You can tell DynamoDB to put a row in the table iff it doesn't exist already. If it does, the underlying AWS library throws a  software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException. There's a test for this in the AWS module here. It needs an AWS account to run.

"This is necessary for a file system-based catalog to ensure atomic transaction in storages like S3 that do not provide file write mutual exclusion." [Iceberg docs] This is a sentiment echoed in this blog.

The issue is the rename, not the data transfer. "Each object transfer is atomic. That is, either a whole file is transferred, or none of it is. But the directory structure is not atomic and a failure can cause mv to fail mid-way." [AWS Questions]

In the old world of HDFS, Spark would write its output to a temporary directory then atomically rename that directory to that of the final destination. However, S3 is not a file system but a blob store and the notion of a directory is just that: notional. When we change a "directory's" name, all the files in a directory need to be renamed one-by-one and renaming all the files Spark outputs is not atomic in S3. Implementations that talk to their own file system must implement Hadoop's OutputCommitter and Spark will call these when preparing to write etc.

The only mention of the lock manager in "Apache Iceberg: The Definitive Guide" is:

"If you are using AWS Glue 3.0 with Iceberg 0.13.1, you must also set the additional configurations for using the Amazon DynamoDB lock manager to ensure atomic transactions. AWS Glue 4.0, on the other hand, uses optimistic locking by default."

which is a bit too cryptic for me apparently because Glue 4.0 has a different version of Iceberg that uses optimistic locking [Discourse].

Catalogs

Catalogs "allows [Iceberg] to ensure consistency with multiple readers and writers and discover what tables are available in the environment... the primary high level requirement for a catalog implementation to work as an Iceberg catalog is to map a table path (e.g., “db1.table1”) to the file path of the metadata file that has the table’s current state."

The Catalogs are:

  • Hadoop. Note that Hadoop is used loosely. "Note anytime you use a distributed file system (or something that looks like one) to store the current metadata pointer, the catalog used is actually called the 'hadoop' catalog." [1] The most important potential downside of this Catalog is "It requires the file system to provide a file/object rename operation that is atomic to prevent data loss when concurrent writes occur." And there are others. "A Hadoop catalog doesn’t need to connect to a Hive MetaStore, but can only be used with HDFS or similar file systems that support atomic rename. Concurrent writes with a Hadoop catalog are not safe with a local FS or S3." [Iceberg docs]
  • Hive. Apart from running an additional process (unlike the Hadoop catalog), "It requires the file system to provide a file/object rename operation that is atomic to prevent data loss when concurrent writes occur." [1]
  • AWS Glue. "Like the Hive catalog, it does not support multi-table transactions" [1]
  • Nessie gives a Git-like experience for data lakes but the two main disadvantages are that you must run the infrastructure yourself (like Hive) and it's not compatible with all engines.
  • REST is by nature simple, is implementation agnostic and "the REST catalog supports multi- table transactions"  "REST Catalog is actually a protocol with a client implementation in the library.  There are examples of how to adapt that protocol to different catalog backends (like HMS or JDBC)... The REST protocol allows for advanced features that other catalogs cannot support, but that doesn't mean all of those features will be available for every REST implementation" [Slack]
  • JDBC is near ubiquitous but "it doesn’t support multi-table transactions". "With JDBC the database does the locking, so no external lock manager is required" [Slack]

So, which should you use? From contributor, Daniel Weeks in Slack:

"If you're not using HMS currently, I would suggest going with JdbcCatalog, which you can also use directly or with a REST frontend... I would strongly suggest using JDBC Catalog unless there's something specific you need. HMS is built for hive and iceberg is not hive.  There is both a lot of completely and baggage that comes with hive.  For example, if you change the table scheme directly in hive, it does not change the schema in your iceberg table.  Same with setting table properties. JDBC is super lightweight and native to iceberg, so if you don't have hive, I would avoid using it.

"There are multiple projects that are starting to adopt REST and I expect that only to grow, but that doesn't mean you necessarily need it right now.  The main thing to think about is using multiple catalogs (not limit yourself to a single one). You can use JDBC directly now (most engines support it), but you can always add a REST frontend later.  They can co-exist and REST can even proxy to your JDBC backend"

[1] "Apache Iceberg: The Definitive Guide"

Saturday, March 9, 2024

Big Data and CPU Caches

I'd previously posted about how Spark's data frame schema is an optimization not an enforcement. If you look at Spark's code, schemas save checking whether something is null. That is all. 

Can this really make so much of a diffence? Surprisingly, omitting a null check can optimize your code by an order of magnitude. 

As ever, the devil is in the detail. A single null check is hardly likely to make a difference to your code. But when you are checking billions of times, you need to take it seriously. 

There is another dimension to this problem. If you're checking the same reference (or a small set of them) then you're probably going to be OK. But if you are null checking large numbers of references, this is where you're going to see performance degradation.

The reason is that a small number of references can live happily in your CPU cache. As this number grows, they're less likely to be cached and your code will force memory to be loaded from RAM into the CPU.

Modern CPUs cache data to avoid hitting RAM. My 2.40GHz Intel Xeon E-2286M has three levels of cache, each bigger (and slower) than the next:

$ sudo dmidecode -t cache  
Cache Information                       
Socket Designation: L1 Cache                     
Maximum Size: 512 kB                 
...                  
Socket Designation: L2 Cache                   
Maximum Size: 2048 kB                
...                      
Socket Designation: L3 Cache                      
Maximum Size: 16384 kB             

Consequently, the speed we can randomly access an array of 64-bit numbers depends on the size of the array. To demonstrate, here is some code that demonstrates. The results look like this:


Who would have thought little optimizations on big data can make such a huge difference?

Saturday, February 24, 2024

Home made Kubernetes cluster

When trying to run ArgoCD, I came across this problem that was stopping me from connecting. Using kubectl port-forward..., I was able to finally connect. But even then, if I ran:

$ kubectl get services --namespace argocd
NAME                                      TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
argocd-applicationset-controller          ClusterIP      10.98.20.142     <none>        7000/TCP,8080/TCP            19h
argocd-dex-server                         ClusterIP      10.109.252.231   <none>        5556/TCP,5557/TCP,5558/TCP   19h
argocd-metrics                            ClusterIP      10.106.130.22    <none>        8082/TCP                     19h
argocd-notifications-controller-metrics   ClusterIP      10.109.57.97     <none>        9001/TCP                     19h
argocd-redis                              ClusterIP      10.100.158.58    <none>        6379/TCP                     19h
argocd-repo-server                        ClusterIP      10.111.224.112   <none>        8081/TCP,8084/TCP            19h
argocd-server                             LoadBalancer   10.102.214.179   <pending>     80:30081/TCP,443:30838/TCP   19h
argocd-server-metrics                     ClusterIP      10.96.213.240    <none>        8083/TCP                     19h

Why was my EXTERNAL-IP still pending? It appears that this is a natural consequence of running my K8s cluster in Minikube [SO].

So, I decided to build my own Kubernetes cluster. This step-by-step guide proved really useful. I built a small cluster of 2 nodes on heterogeneous hardware. Note that although you can use different OSs and hardware, you really need to use the same version of K8s on all boxes (see this SO).

$ kubectl get nodes -o wide
NAME    STATUS   ROLES           AGE   VERSION   INTERNAL-IP     EXTERNAL-IP   OS-IMAGE             KERNEL-VERSION      CONTAINER-RUNTIME
adele   Ready    <none>          18h   v1.28.2   192.168.1.177   <none>        Ubuntu 18.04.6 LTS   5.4.0-150-generic   containerd://1.6.21
nuc     Ready    control-plane   18h   v1.28.2   192.168.1.148   <none>        Ubuntu 22.04.4 LTS   6.5.0-18-generic    containerd://1.7.2

Great! However, Flannel did not seem to be working properly:

$ kubectl get pods --namespace kube-flannel -o wide 
NAME                    READY   STATUS             RESTARTS         AGE    IP              NODE    NOMINATED NODE   READINESS GATES
kube-flannel-ds-4g8gg   0/1     CrashLoopBackOff   34 (2m53s ago)   152m   192.168.1.148   nuc     <none>           <none>
kube-flannel-ds-r4xvt   0/1     CrashLoopBackOff   26 (3m11s ago)   112m   192.168.1.177   adele   <none>           <none>

And journalctl -fu kubelet was puking  "Error syncing pod, skipping" messages.

Aside: Flannel is a container on each node that coordinates the segmentation of the virtual network. For coordination, it can use etcd, which can be thought of like Zookeeper in the Java ecosystem. "Flannel does not control how containers are networked to the host, only how the traffic is transported between hosts." [GitHub]

The guide seemed to omit one detail that lead to me to see the Flannel container puking something like this error:

E0427 06:08:23.685930 13405 memcache.go:265] couldn’t get current server API group list: Get “https://X.X.X.X:6443/api?timeout=32s 2”: dial tcp X.X.X.X:6443: connect: connection refused

Following this SO answer revealed that the cluster's CIDR had not been set. So, I patched it following this [SO] advice so:

kubectl patch node nuc -p '{"spec":{"podCIDR":"10.244.0.0/16"}}'
kubectl patch node adele -p '{"spec":{"podCIDR":"10.244.0.0/16"}}'

which will work until the next reboot (one of the SO answers describes how to make that permanent as does this one).

Anyway, this was the puppy and now the cluster seems to be behaving well.

Incidentally, this gives a lot of log goodies:

kubectl cluster-info dump