3 min read

Databricks : Encoding Lat & Lon for NPI Locations using the Census API

💡
The NPPES data dissemination readme outlines file covers columns, layouts, and timing of the NPI releases. This post covers using the Census API and Databricks to lookup latitude and longitude based from the providers location.

As part of our price transparency work, I wanted to encode Idaho provider location addresses into lat and lon using the Census API. The NPPES NPI data contains provider identifiers for organizations and individuals across the US. These identifiers map back to provider billing and practice locations. I'm interested in understanding the cost of services relative to provider locations (not the billing address) for price transparency data.

The Census API provides a single or bulk address lookup that returns a cleaned address, lat, long and tiger line information. Using the NPPES NPI file, I located all practice locations containing ID for the Provider Business Practice Location Address State Name column and then called the API using a UDF function. I'll cover each step below:

Build the NPI Data Frame

The single address Census API requires an address, city, state and zip. I started by creating a data frame containing only provider addresses in Idaho. That data frame becomes the input for single calls to the API. Using the bulk census API is also an option.

8GB+ is not uncommon for NPI file sizes. I only want to extract Idaho providers first practice locations - a small subset of the raw column. In the following code, I read the raw NPI CSV from the Databricks file systems, extracting practice columns and renaming them for easier use. I also split the provider zip into 5 digit and 4 digit locations - NPI data often contains the full nine digit zip.

from pyspark.sql.functions import col
from pyspark.sql import functions as F

df = spark.read \
    .options(header='True') \
    .csv("dbfs:/mnt/npi/npi.csv")

MAP = [
    ('npi', 'NPI'),
    ('code', 'Entity Type Code'),
    ('ein', 'Employer Identification Number (EIN)'),
    ('org_name', 'Provider Organization Name (Legal Business Name)'),
    ('last_name', 'Provider Last Name (Legal Name)'),
    ('practice_address_1', 'Provider First Line Business Practice Location Address'),
    ('practice_address_2', 'Provider Second Line Business Practice Location Address'),
    ('practice_city', 'Provider Business Practice Location Address City Name'),
    ('practice_state', 'Provider Business Practice Location Address State Name'),
    ('practice_zip', 'Provider Business Practice Location Address Postal Code')
]

rdd = sc.parallelize(MAP)
npi_key_value_rdd = rdd.map(lambda x: (x[1], x[0]))

keys = npi_key_value_rdd.keys().collect()
values = npi_key_value_rdd.values().collect()

npis = df \
    .select(keys)

project_list = [col(col_name).alias(npi_key_value_rdd.lookup(col_name)[0]) for col_name in npis.columns]

npis = npis \
    .select(*project_list) 

npis = npis \
    .filter(npis.practice_state == 'ID') \
    .withColumn("zip5", F.expr("substring(practice_zip, 1, 5)"))\
    .withColumn("zip4", F.expr("substring(practice_zip, 6, 4)")) 

The map value defines the source columns, the key defines the names of the output column. Rather than map each column directly in a withColumn call, I defined the map. The map value converts to an rdd to define the project_list used in our select.

Define a UDF to Call the Census API

Census.gov provides a free endpoint to encode an address to a lon and lat value. If a match is found, the following object is returned. You can find more at the Census Geocoding Services site.

    return {
        'lon' : x,
        'lat': y,
        'tiger_side' : tiger_side,
        'tiger_line_id' : tiger_line_id,
        'response' : repsonse_text,
        'status' :  status,
        'status_message' : status_message
    }

To call the census API, we created a UDF function in python used in a withColumn call to add a latlon column value. Here is the UDF.

coord = StructType([
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False),
    StructField("tiger_side", StringType(), True),
    StructField("tiger_line_id", StringType(), True),
    StructField("status", StringType(), True),
    StructField("status_message", StringType(), True),
    StructField("response", StringType(), True)
])

@udf(returnType=coord)
def get_census_lat_lon(arr):
    npi = arr[0]
    address = arr[1]
    address_two = arr[2]
    city = arr[3]
    state = arr[4]
    zipcode = arr[5]

    oa = "{}, {}, {} {}".format(address, city, state, zipcode)

    if address_two:
        oa = "{} {}, {}, {} {}".format(address, address_two, city, state, zipcode)

    oa_url = urllib.parse.quote(oa)

    # Build Request for Census API - handling error 
    error = None
    x = None
    y = None
    tiger_side = None
    tiger_line_id = None
    status = 'FAIL'
    status_message = None

    try: 
        request_uri = "https://geocoding.geo.census.gov/geocoder/locations/onelineaddress?address={}&benchmark=2020&format=json".format(oa_url)

        response = requests.get(
            request_uri,
            headers={"content-type":"json"}
        )

        response_json = response.json()
        repsonse_text = response.text
        response_status = response.status_code

        if response_status == 200:
            result = response_json['result']

            #validate one or more address match returned
            if not 'addressMatches' in result or len(result['addressMatches']) == 0:
                status = 'FAIL'
                status_message = "{} : addressMatches array empty in response".format(npi)
            else:
                first = response_json['result']['addressMatches'][0]
                coord = first['coordinates']
                tiger = first['tigerLine']

                x = coord['x']
                y = coord['y']
                tiger_side = tiger['side']
                tiger_line_id = tiger['tigerLineId']
                status = 'PASS'

    except Exception as e:
        status_message = "{} : {}".format(npi, e)


    return {
        'lon' : x,
        'lat': y,
        'tiger_side' : tiger_side,
        'tiger_line_id' : tiger_line_id,
        'response' : repsonse_text,
        'status' :  status,
        'status_message' : status_message
    }

The UDF returns a coord object defined at top, which includes the lat, long, tiger_side, tiger_line_id, status, status_message and response. The input is an array of npi, practice_address_1, practice_address_2, practice city, practice_state and zip5 from the NPI file.

batch_df = batch_df \
    .withColumn("latlon", get_census_lat_lon(array(col("npi"), col("practice_address_1"), col("practice_address_2"), col("practice_city"), col("practice_state"), col("zip5")))) \
    .select(*npis.columns, "latlon.*")