Databricks : Encoding Lat & Lon for NPI Locations using the Census API
As part of our price transparency work, I wanted to encode Idaho provider location addresses into lat and lon using the Census API. The NPPES NPI data contains provider identifiers for organizations and individuals across the US. These identifiers map back to provider billing and practice locations. I'm interested in understanding the cost of services relative to provider locations (not the billing address) for price transparency data.
The Census API provides a single or bulk address lookup that returns a cleaned address, lat, long and tiger line information. Using the NPPES NPI file, I located all practice locations containing ID for the Provider Business Practice Location Address State Name column and then called the API using a UDF function. I'll cover each step below:
Build the NPI Data Frame
The single address Census API requires an address, city, state and zip. I started by creating a data frame containing only provider addresses in Idaho. That data frame becomes the input for single calls to the API. Using the bulk census API is also an option.
8GB+ is not uncommon for NPI file sizes. I only want to extract Idaho providers first practice locations - a small subset of the raw column. In the following code, I read the raw NPI CSV from the Databricks file systems, extracting practice columns and renaming them for easier use. I also split the provider zip into 5 digit and 4 digit locations - NPI data often contains the full nine digit zip.
from pyspark.sql.functions import col
from pyspark.sql import functions as F
df = spark.read \
.options(header='True') \
.csv("dbfs:/mnt/npi/npi.csv")
MAP = [
('npi', 'NPI'),
('code', 'Entity Type Code'),
('ein', 'Employer Identification Number (EIN)'),
('org_name', 'Provider Organization Name (Legal Business Name)'),
('last_name', 'Provider Last Name (Legal Name)'),
('practice_address_1', 'Provider First Line Business Practice Location Address'),
('practice_address_2', 'Provider Second Line Business Practice Location Address'),
('practice_city', 'Provider Business Practice Location Address City Name'),
('practice_state', 'Provider Business Practice Location Address State Name'),
('practice_zip', 'Provider Business Practice Location Address Postal Code')
]
rdd = sc.parallelize(MAP)
npi_key_value_rdd = rdd.map(lambda x: (x[1], x[0]))
keys = npi_key_value_rdd.keys().collect()
values = npi_key_value_rdd.values().collect()
npis = df \
.select(keys)
project_list = [col(col_name).alias(npi_key_value_rdd.lookup(col_name)[0]) for col_name in npis.columns]
npis = npis \
.select(*project_list)
npis = npis \
.filter(npis.practice_state == 'ID') \
.withColumn("zip5", F.expr("substring(practice_zip, 1, 5)"))\
.withColumn("zip4", F.expr("substring(practice_zip, 6, 4)"))
The map
value defines the source columns, the key defines the names of the output column. Rather than map each column directly in a withColumn
call, I defined the map. The map value converts to an rdd to define the project_list
used in our select.
Define a UDF to Call the Census API
Census.gov provides a free endpoint to encode an address to a lon and lat value. If a match is found, the following object is returned. You can find more at the Census Geocoding Services site.
return {
'lon' : x,
'lat': y,
'tiger_side' : tiger_side,
'tiger_line_id' : tiger_line_id,
'response' : repsonse_text,
'status' : status,
'status_message' : status_message
}
To call the census API, we created a UDF function in python used in a withColumn
call to add a latlon column value. Here is the UDF.
coord = StructType([
StructField("lat", DoubleType(), False),
StructField("lon", DoubleType(), False),
StructField("tiger_side", StringType(), True),
StructField("tiger_line_id", StringType(), True),
StructField("status", StringType(), True),
StructField("status_message", StringType(), True),
StructField("response", StringType(), True)
])
@udf(returnType=coord)
def get_census_lat_lon(arr):
npi = arr[0]
address = arr[1]
address_two = arr[2]
city = arr[3]
state = arr[4]
zipcode = arr[5]
oa = "{}, {}, {} {}".format(address, city, state, zipcode)
if address_two:
oa = "{} {}, {}, {} {}".format(address, address_two, city, state, zipcode)
oa_url = urllib.parse.quote(oa)
# Build Request for Census API - handling error
error = None
x = None
y = None
tiger_side = None
tiger_line_id = None
status = 'FAIL'
status_message = None
try:
request_uri = "https://geocoding.geo.census.gov/geocoder/locations/onelineaddress?address={}&benchmark=2020&format=json".format(oa_url)
response = requests.get(
request_uri,
headers={"content-type":"json"}
)
response_json = response.json()
repsonse_text = response.text
response_status = response.status_code
if response_status == 200:
result = response_json['result']
#validate one or more address match returned
if not 'addressMatches' in result or len(result['addressMatches']) == 0:
status = 'FAIL'
status_message = "{} : addressMatches array empty in response".format(npi)
else:
first = response_json['result']['addressMatches'][0]
coord = first['coordinates']
tiger = first['tigerLine']
x = coord['x']
y = coord['y']
tiger_side = tiger['side']
tiger_line_id = tiger['tigerLineId']
status = 'PASS'
except Exception as e:
status_message = "{} : {}".format(npi, e)
return {
'lon' : x,
'lat': y,
'tiger_side' : tiger_side,
'tiger_line_id' : tiger_line_id,
'response' : repsonse_text,
'status' : status,
'status_message' : status_message
}
The UDF returns a coord object defined at top, which includes the lat, long, tiger_side, tiger_line_id, status, status_message and response. The input is an array of npi, practice_address_1, practice_address_2, practice city, practice_state and zip5 from the NPI file.
batch_df = batch_df \
.withColumn("latlon", get_census_lat_lon(array(col("npi"), col("practice_address_1"), col("practice_address_2"), col("practice_city"), col("practice_state"), col("zip5")))) \
.select(*npis.columns, "latlon.*")