Automatic CSV to SQL ETL in Clojure

Ivar Thorson bio photo By Ivar Thorson

This is another basic example to help beginners get started in Clojure, which is still my favorite programming language, almost 10 years after I first started using it. Like all languages, Clojure has some warts. But it also has a lot of very powerful and abstract concepts that many average programmers have not seen before, and like most mathematical concepts, the power of such abstractions are greatly underestimated by many people. Even fancy-sounding but easy-to-understand concepts like homoiconicity are still really underappreciated in the programming community.

Compared with other mathematically-oriented languages like Haskell, Clojure is also arguably a more practical choice for practical tasks – it’s designed for massive concurrency, and has more libraries than any other language. This is a bold but probably a true statement because, in addition to its own libraries, Clojure can use java libraries on the back-end and javascript libraries on the front-end, which are two of the most popular languages in existence.

TL; DR

In this article, I’ll present a simple example of setting up a SQL database from a bunch of CSV files. This is called an ETL job (Extract, Transfer, Load), which is a really common pattern in software.

Since most ETL jobs like this are quite simple, let’s make a trivial example slightly more realistic by trying to automatically infer the database schema automatically. Let’s also try to make it fast enough to be usable for files with a few million rows in it.

An overview of the approach that we will follow is:

  1. Group CSVs by directory, such that each directory corresponds to a SQL table that we would like to create.

  2. Scan the the directory tree to create a list of CSV files we want to scan.

  3. Autodetect the data type of each column across all files in each directory, and store the schema just outside the directory, so that it can be modified as needed.

  4. Create the table from the autodetected schema.

  5. Load the CSV files into SQL tables.

In past articles, we discussed how to read in CSVs and create a postgres database with docker. I’ll just assume that you read those already, that you have a Postgres instance running in docker already, so that we can get on with the ETL-specific code.

1. Group CSVs in Directories

Let’s go get some census data from 2010 and download some data. The census.ire.org website has a handy tool that let’s you download census data on each state. Since this is an example, we will only use five states’ worth of data.

  1. Alabama
  2. Alaska
  3. Arizona
  4. Arkansas
  5. California

Just making one table from a few small CSVs isn’t going to fully demonstrate this example, so let’s also fetch a much bigger dataset…the past 15 years of crime reports from Los Angeles, courtesy of data.gov.

  1. “Crime Data from 2010 to Present”

Note that this file is MUCH larger than the census data; it is 6.8 million lines long, and has about 20 columns, and is 1.5G in size. It will be a better benchmark of performance than those short census files.

Now place all the files in a directory tree like this:

sqlcsv/
├── census
│   ├── all_060_in_01.P1.csv
│   ├── all_060_in_02.P1.csv
│   ├── all_060_in_04.P1.csv
│   ├── all_060_in_05.P1.csv
│   └── all_060_in_06.P1.csv
└── crimes
    └── Crimes_-_2001_to_present.csv

2. Scan directories

This is pretty easy using file-seq. All we have to do is create a few functions for listing files and subdirectories.

(ns net.roboloco.files
  (:require [java-time :as jt]
            [clojure.java.io :as jio]))

(defn list-files
  "Lists only the files in the directory string DIR."
  [dir]
  (->> (file-seq (clojure.java.io/file dir))
       (remove #(.isDirectory ^java.io.File %))))

(defn list-subdirectories
  "Lists only the subdirectorys of the directory string DIR"
  [dir]
  (->> (file-seq (clojure.java.io/file dir))
       (filter #(.isDirectory %))
       (remove #(= % (clojure.java.io/file dir)))))

(defn has-suffix?
  "Works on file object types."
  [ ^String suffix ^java.io.File file]
  (and (.isFile file)
       (re-find (re-pattern (str ".*\\." suffix "$")) (.getName file))))

(defn list-files-of-type 
  "Lists all files in the directory with the extension ext."
  [dir ext]
  (->> (file-seq (clojure.java.io/file dir))
       (filter (partial has-suffix? ext))))

Some string-cleaning utility functions will also come in handy:

(ns net.roboloco.util)

(set! *warn-on-reflection* true)

(defn alphanumeric?
  "TRUE when the string is completely alphanumeric."
  [string]
  (= string (apply str (re-seq #"[a-z_A-Z0-9]" string))))

(defn spaces-to-underscores
  "Converts spaces to underscores."
  [string]
  (clojure.string/replace string #"\s" "_"))

(defn periods-to-underscores
  "Converts spaces to underscores."
  [string]
  (clojure.string/replace string #"\." "_"))

We’ll also need some simple date-parsing functions:

(ns net.roboloco.dates
  "Code for handling strings reperesnting dates and datetimes."
  (:require [java-time :as jt]))

(set! *warn-on-reflection* true)

(defn parse-date
  "Parses a standard date, like 2019-02-17."
  [s]
  (jt/local-date "yyyy-MM-dd" s))

(defn parse-datetime
  "Returns the datetime format that Python's pandas usually saves in."
  [s]
  (jt/local-date-time "yyyy-MM-dd HH:mm:ss" s))

(defn local-to-offset
  "Converts a local date time to an offset date time. By default, it assumes
  that the local time is UTC, but you may change this with optional arg TZ."
  [local-date-time & [tz]]
  (let [tz (or tz "UTC")]
    (-> local-date-time
        (jt/zoned-date-time tz)
        (jt/offset-date-time))))

(defn parse-RFC3339
  "Assuming a UTC datestamp with T and Z separator, for example:
  2019-01-17T22:03:16Z
  2019-01-17T22:03:16.383Z
  2019-01-17T22:03:16.111222333Z"
  [s]
  (local-to-offset
   (condp = (count s)
     20 (jt/local-date-time "yyyy-MM-dd'T'HH:mm:ss'Z'" s)
     24 (jt/local-date-time "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" s)
     27 (jt/local-date-time "yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'" s)
     30 (jt/local-date-time "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'" s))))

3. Autodetect the Schema

This is by far the most complex section of the program. For each element, the autodetector tests each of the parsing functions in sql-types-and-parsers, and the first that works will be is considered the inferred SQL type. As I experimented with this at the REPL, I realized that testing every sql parser on every element was prohibitively slow, so I defined guess-all-sql-types-in-column to reduce the rate of failed tests by remembering which parser last worked for each column.

Another note on optimization: although I initially assumed that I could make the CSV loading faster by scanning only the first N lines of each file, this ended up being error-prone in general, so I relented and allowed it to scan the whole file.

Note that this code only works for integers, floats, dates, datetimes, and strings (text), but that you could easily extend it by adding more things to type-definitions. An exception thrown if types do not match in different files.

(ns net.roboloco.guess-schema
  (:require [clojure.data.csv]
            [net.roboloco.dates :as dates]
            [net.roboloco.files :as files]
            [net.roboloco.util :as util]))

(set! *warn-on-reflection* true)

(def ^:dynamic *sql-types-and-parsers*
  ;; This data structure defines all of the SQL data types, and the appropriate
  ;; function to use when parsing a string containing that data type.
  ;; Parsers will be tried in sequential order, and the first one that works is used.
  ;; 
  ;; SQL               String->CLJ Parser
  [["NULL"             #(or (nil? %) (empty? %))]
   ["INTEGER"          #(Integer/parseInt %)]
   ["DOUBLE PRECISION" #(Float/parseFloat %)]
   ["DATE"             dates/parse-date]
   ["TIMESTAMPTZ"      dates/parse-RFC3339]
   ["TEXT"             #(str %)]]) ;; this is always true, so is the "default" value

(defn guess-sql-parser
  "Given an unknown string, this fn runs through all of the SQL types & parsers in 
  sql-types-and-parsers and returns the first row with a working parser."
  [string]
  (loop [types-and-parsers *sql-types-and-parsers*]
    (when-let [[sql-type parse-fn :as typerow] (first types-and-parsers)]
      (if (try (parse-fn string)
               (catch Exception e false))
        typerow
        (recur (next types-and-parsers))))))

(defn guess-all-sql-types-in-column
  "Like guess-sql-type, but an optimized version for looking at a whole column.
  In practice, this really reduces the number of tests and exceptions trapped
  over the simpler but much slower solution: 
         (set (flatten (map guess-sql-parser seq-of-strings)))"
  [seq-of-strings]
  (loop [strings seq-of-strings
         last-successful-parse-fn nil
         types-found #{}]
    (if-let [string (first strings)]
      (if (try (last-successful-parse-fn string)
               (catch Exception e false))
        (recur (next strings)  ; Previously successful parser worked again
               last-successful-parse-fn
               types-found)
        (if-let [[sql-type parse-fn] (guess-sql-parser string)]
          (recur (next strings)  ; A new working parser was found
                 parse-fn
                 (conj types-found sql-type))
          (recur (next strings)  ; No working parser found, move to next string
                 last-successful-parse-fn
                 (conj types-found nil))))
      types-found)))

(defn clean-column-names
  "Replaces whitespaces and periods in column names with underscores."
  [columns]
  (->> columns
       (map util/periods-to-underscores)
       (mapv util/spaces-to-underscores)))

(defn guess-csv-column-types
  "Returns a map of column name to the guessed SQL column type. Reads every
  row in the CSV, and returns all types found for each column. Works in 
  parallel and lazily on chunks of 1000 lines, to reduce the time to parse
  very large files."
  [csv-filepath]
  (println "Scanning:" csv-filepath)
  (with-open [reader (clojure.java.io/reader csv-filepath)]
     (let [rows (clojure.data.csv/read-csv reader)
           header (clean-column-names (first rows))
           data-rows (rest rows)
           chunk-size 10000]
       (->> data-rows          
            (partition-all chunk-size)
            (map #(apply map vector %)) ;; Convert list of rows into list of columns
            (map #(pmap guess-all-sql-types-in-column %))
            (map (fn [i data] (println (* 10000 (inc i)) "rows scanned") data) (range))
            (apply map (fn [& args] (reduce clojure.set/union args)))
            (map vector header)
            (into {})))))

(defn scan-csvdir-and-make-schema
  "Scans the header of every .csv file in CSVDIR, and returns a hashmap
  containing the schema of all the columns in the directory.
  If a non-alphanumeric string is found, raises an exception. 
  If the schema is inconsistent, raises an exception."
  [csvdir]
  (let [csv-schemas (->> (files/list-files-of-type csvdir "csv")
                         (map guess-csv-column-types))
        columns (set (flatten (map keys csv-schemas)))
        problematic-columns (remove util/alphanumeric? columns)]
    (when-not (empty? problematic-columns)
      (throw (Exception. (str "Non-alphanumeric characters found in column names:"
                              (apply str (interpose ", "  problematic-columns))))))
    (into {} (for [col columns]
               (let [all-types-for-col (->> (map #(vec (get % col)) csv-schemas)
                                            (flatten)
                                            (remove nil?)
                                            (set))
                     nullable-suffix (if (get all-types-for-col "NULL") 
                                       " NULL"
                                       "")
                     types (disj all-types-for-col "NULL")]
                 (cond
                  (= 0 (count types))        [col nil]
                  (= 1 (count types))        [col (str (first types) nullable-suffix)]
                  ;; If it's mixed integer and float, make everything float
                  (= #{"INTEGER" "DOUBLE PRECISION"} types) 
                                             [col (str "DOUBLE PRECISION" nullable-suffix)]
                  ;; If the default type of TEXT is in there, choose text
                  (get types "TEXT") [col (str "TEXT" nullable-suffix)]
                  :otherwise  ;; Otherwise we have some weird error
                  (throw (Exception. (str "Inconsistent types across files for column: " 
                                           col (vec types))))))))))

(defn parse-csv-rows-using-schema
  "Lazily parse CSV-ROWS using the schema."
  [schema csv-rows]
  (let [header (clean-column-names (first csv-rows))
        types  (map #(get schema %) header)
        empty-string-to-nil (fn [s] (if (and (string? s) (empty? s)) nil s))
        raw-rows (map #(map empty-string-to-nil %) (rest csv-rows))
        all-parsers (into {} *sql-types-and-parsers*)
        row-parsers (mapv #(get all-parsers %) types)
        typed-rows (for [raw-row raw-rows]
                     (map (fn [parse-fn element]
                            (when (and parse-fn (not (empty? element)))
                              (try (parse-fn element)
                                   (catch Exception e
                                     (println "Schema:" schema)
                                     (println "Header:" header)
                                     (println "Raw row:" raw-row)
                                     (throw e)))))
                          row-parsers
                          raw-row))
        cnt (atom 0)
        chunk-size 1000]
    [header typed-rows]))

(defn table-definition-sql-string
  "Returns a string suitable for creating a SQL table named TABLE-NAME, given
  a hashmap SCHEMA of column names to column types. The ENDING-STRING is appended
  to the end of the create table statement, if needed. "
  [table-name schema & [ending-string]]
  (let [ending-string (or ending-string "")
        col-defs (->> schema
                      (sort-by first)
                      (remove (comp nil? second))
                      (map (fn [[col type]] (format "\t%s %s" col type)))
                      (interpose ",\n")
                      (apply str))]
    (format "CREATE TABLE %s (\n%s %s\n);"
            table-name col-defs ending-string)))

We are now ready to actually do the autodetection! Let’s create our main namespace:

(ns net.roboloco.csv2sql
  (:gen-class)
  (:require [clojure.data.csv]
            [clojure.java.jdbc :as sql]
            [net.roboloco.guess-schema :as guess]
            [net.roboloco.files :as files]))

(set! *warn-on-reflection* true)

(defn table-schema-filename [dirname] (format "%s-schema.edn" dirname))
(defn table-sql-filename [dirname] (format "%s.sql" dirname))

(defn autodetect-sql-schemas!
  "Scans through the subdirectories of CSVDIR, infers the column data types,
  and stores the inferred schema in CSVDIR so that you may manually edit it
  before loading it in with MAKE-SQL-TABLES."
  [csvdir]
  (doseq [dir (files/list-subdirectories csvdir)]
    (printf "Autodetecting schema for: %s\n" dir)
    (let [tablename (.getName ^java.io.File dir)
          schema (guess/scan-csvdir-and-make-schema dir)]
      (when-not (empty? schema)
        (let [table-sql (guess/table-definition-sql-string tablename schema)]
          (println (str csvdir (table-schema-filename tablename)) schema)          
          (spit (str csvdir (table-schema-filename tablename)) schema)
          (spit (str csvdir (table-sql-filename tablename)) table-sql))))))

You may note that I’m storing the SQL schemas for each subdirectory in the root sqlcsv/ directory. This will let you hand-tune the schema as needed, if you want to make an index on one key or another, or make a particular column unique and required.

4. Create the Autodetected Schema

With the schema autodetected, we now need to create the tables. Continuing along with the net.roboloco.csv2sql namespace, and assuming that you are using the same postgres database from a previous article:

(def default-db {:dbtype "postgresql" 
                 :dbname   (or (System/getenv "POSTGERS_DB")  "csv2sql")
                 :user     (or (System/getenv "POSTGRES_USER") "postgres")
                 :password (or (System/getenv "POSTGRES_PASS") "mysecretpassword")})

(defn connection-ok?
  "A predicate that tests if the database is connected."
  [db]
  (= {:result 15} (first (sql/query db ["select 3*5 as result"]))))

(defn drop-existing-sql-tables!
  "For each subdirectory in DIRNAME, drop any tables with the same name."
  [db csvdir]
  (doseq [table-name (map (fn [f] (.getName ^java.io.File f))
                          (files/list-subdirectories csvdir))]    
    (let [cmd (format "DROP TABLE IF EXISTS %s;" table-name) ]
      (sql/db-do-commands db cmd))))

(defn make-sql-tables!
  "Makes the SQL tables from whatever is in the database. "
  [db csvdir]
  (doseq [sql-file (map (fn [f] (.getName ^java.io.File f)) 
                        (files/list-files-of-type csvdir "sql"))]
    (let [table-sql (slurp sql-file)]
      (println table-sql)
      (sql/db-do-commands db table-sql))))

5. Load the CSV files into SQL

The final step is to load in the CSV file. As we do so, we need to parse the strings from the CSV using the schema so that they are converted into the proper data type for JDBC to properly insert them in Postgres.

(defn insert-csv!
  "Inserts the rows of the CSV into the database, converting the rows to the appropriate
  type as they are loaded. Lazy, so it works on very large files. If a column is not
  found in the schema, it is omitted and not inserted into the database. "
  [db table csvfile schema]
  (with-open [reader (clojure.java.io/reader csvfile)]
    (let [csv-rows (clojure.data.csv/read-csv reader)
          [header typed-rows] (guess/parse-csv-rows-using-schema schema csv-rows)
          cnt (atom 0)
          chunk-size 1000]
      (doseq [chunk-of-rows (partition-all chunk-size typed-rows)]
        (let [line-num (swap! cnt inc)]
            (println "Inserted"  (* chunk-size (inc @cnt)) "rows"))
        (sql/insert-multi! db table header chunk-of-rows)))))

(defn insert-all-csvs!
  "Loads all the subdirectories of CSVDIR as tables. Optional hashmap MANUAL-OPTIONS
  lets you decide how to customize various tables; for example, you may want to set
  an optional table."
  [db csvdir]  
  (doseq [dirname (map (fn [f] (.getName ^java.io.File f))
                       (files/list-subdirectories csvdir))]
    (let [filepath (str csvdir "/" (table-schema-filename dirname))
          _ (println filepath)
          schema (slurp filepath)]
      (when-not (empty? schema)
        (->> (files/list-files-of-type (str csvdir "/" dirname) "csv")
             (map (fn [csvfile]
                    (println (format "Loading: %s" csvfile))
                    (insert-csv! db dirname csvfile schema)))
             doall)))))

(defn -main
  []
  (let [csvdir (System/getenv "CSVDIR")
        db default-db]
    (when-not (connection-ok? db)
      (throw (Exception. (str "Unable to connect to DB:" db))))
    (autodetect-sql-schemas! csvdir)
    (make-sql-tables! db csvdir)
    (insert-all-csvs! db csvdir)
    (println "Done!")))

Nothing left to do but try it out! The final step is to run (-main) either at the REPL or add gen-class to the namespace and build an uberjar with (-main) set as the entry point (in project.clj), and then launch it with an environment variable that sets the CSVDIR:

lein uberjar

CSVDIR=/path/to/your/sqlcsv/ java -jar target/csv2sql-0.1.0-SNAPSHOT-standalone.jar

Conclusion

The above is probably sufficient for this exercise – this ETL job will populate a database with a few million rows in a few minutes. On my laptop, it ingests about 5000-10000 rows per second, depending on the CSV.

Not bad a couple hundred lines of code, but it could probably still be trimmed/simplified. The above code may be found in the csv2sql repo if you want to go further.

Some possible extensions to this would be:

  1. Warn the user if 99.9% of the elements of a column are of one type, but there are a few values that are of a different type.

  2. Add support for JSONs, rather than just CSVs. This would probably involve flattening nested JSONs so that {"a": {"b": 2}} would become {"a.b": 2}, and might involve generalizing the loader.

  3. If your database supports tens or hundreds of millions of rows, add support for Parquet files, a common tabular data format for big data.

This little program was focused on ETL, and we should probably stop at that. Rather than bolt on a HTTP CRUD API here, it might make more sense to contain that functionality as a separate app.

References

  1. http://clojure-doc.org/articles/ecosystem/java_jdbc/using_sql.html
  2. https://docs.timescale.com/v1.2/using-timescaledb/writing-data