Tuesday, February 3, 2009

Easy CSV Uploads? - Yes we can!

A few months ago Chet Justice and I did an ApEx presentation at our Oracle users group. He put together a demo of how one could parse a comma separated block of text "in memory". He did this via a pipelined function, which was a really neat trick with which he was able to emulate functionality similar to the ApEx Load utility that allows you to upload CSVs after pasting the text into a textarea.

I recently had to work on a project that required this functionality, only it was based on a file upload rather than input from a textarea. I started by revisiting Chet's example, modified the code to make it more generic and flexible, and then packaged it away. I did have to replace the pipelined function with a temporary table to utilize the bulk load while creating the collection which really improves performance with large files. The result is an easy to use package that allows one to populate ApEx collections from either file uploads or pastes into textareas.

Checkout the demo...

WARNING: THIS IS NOT PRODUCTION CODE. This package is young and although I've not had any issues, I can not guarantee that it is bug free. Please let me know if you encounter any issues. I'll eventually do another post in the future when the package is more mature.

Here's how to get it working...

  1. Compile this type:
      CREATE GLOBAL TEMPORARY TABLE "CSV_TEMP"
      ( "ATTR_01" VARCHAR2(4000 BYTE),
     "ATTR_02" VARCHAR2(4000 BYTE),
     "ATTR_03" VARCHAR2(4000 BYTE),
     "ATTR_04" VARCHAR2(4000 BYTE),
     "ATTR_05" VARCHAR2(4000 BYTE),
     "ATTR_06" VARCHAR2(4000 BYTE),
     "ATTR_07" VARCHAR2(4000 BYTE),
     "ATTR_08" VARCHAR2(4000 BYTE),
     "ATTR_09" VARCHAR2(4000 BYTE),
     "ATTR_10" VARCHAR2(4000 BYTE),
     "ATTR_11" VARCHAR2(4000 BYTE),
     "ATTR_12" VARCHAR2(4000 BYTE),
     "ATTR_13" VARCHAR2(4000 BYTE),
     "ATTR_14" VARCHAR2(4000 BYTE),
     "ATTR_15" VARCHAR2(4000 BYTE),
     "ATTR_16" VARCHAR2(4000 BYTE),
     "ATTR_17" VARCHAR2(4000 BYTE),
     "ATTR_18" VARCHAR2(4000 BYTE),
     "ATTR_19" VARCHAR2(4000 BYTE),
     "ATTR_20" VARCHAR2(4000 BYTE),
     "ATTR_21" VARCHAR2(4000 BYTE),
     "ATTR_22" VARCHAR2(4000 BYTE),
     "ATTR_23" VARCHAR2(4000 BYTE),
     "ATTR_24" VARCHAR2(4000 BYTE),
     "ATTR_25" VARCHAR2(4000 BYTE),
     "ATTR_26" VARCHAR2(4000 BYTE),
     "ATTR_27" VARCHAR2(4000 BYTE),
     "ATTR_28" VARCHAR2(4000 BYTE),
     "ATTR_29" VARCHAR2(4000 BYTE),
     "ATTR_30" VARCHAR2(4000 BYTE),
     "ATTR_31" VARCHAR2(4000 BYTE),
     "ATTR_32" VARCHAR2(4000 BYTE),
     "ATTR_33" VARCHAR2(4000 BYTE),
     "ATTR_34" VARCHAR2(4000 BYTE),
     "ATTR_35" VARCHAR2(4000 BYTE),
     "ATTR_36" VARCHAR2(4000 BYTE),
     "ATTR_37" VARCHAR2(4000 BYTE),
     "ATTR_38" VARCHAR2(4000 BYTE),
     "ATTR_39" VARCHAR2(4000 BYTE),
     "ATTR_40" VARCHAR2(4000 BYTE),
     "ATTR_41" VARCHAR2(4000 BYTE),
     "ATTR_42" VARCHAR2(4000 BYTE),
     "ATTR_43" VARCHAR2(4000 BYTE),
     "ATTR_44" VARCHAR2(4000 BYTE),
     "ATTR_45" VARCHAR2(4000 BYTE),
     "ATTR_46" VARCHAR2(4000 BYTE),
     "ATTR_47" VARCHAR2(4000 BYTE),
     "ATTR_48" VARCHAR2(4000 BYTE),
     "ATTR_49" VARCHAR2(4000 BYTE),
     "ATTR_50" VARCHAR2(4000 BYTE)
      ) ON COMMIT DELETE ROWS;
  2. Compile this package spec:
    create or replace
    PACKAGE csv AS
    /*****************************************************************************/
      /*
         Chet Justice, Daniel McGhan
        
         
            This procedure was designed to create and populate a collection based
            on a CSV CLOB. This was written specifically for use in ApEx and must
            be called from within a valid session.
         
        
         Modification History
         Date        By           Modification
         ----------- ------------ -----------------------------------------------
         
         27-JAN-2008 DMCGHAN      Initial Creation
              
      */
      PROCEDURE create_collection_from_clob (
         p_csv                IN CLOB
       , p_collection_name    IN VARCHAR2
       , p_header_included    IN CHAR := 'N'
       , p_delimiter          IN CHAR := ','
       , p_enclosed_by        IN CHAR := NULL
       , p_replace_collection IN CHAR := 'Y'
      );
     
    /*****************************************************************************/
    
      /*
         Daniel McGhan
        
         
            This procedure was designed to create and populate a collection based
            on a CSV BLOB. This was written specifically for use in ApEx and must
            be called from within a valid session.
         
        
         Modification History
         Date        By           Modification
         ----------- ------------ -----------------------------------------------
         
         27-JAN-2008 DMCGHAN      Initial Creation
              
      */
      PROCEDURE create_collection_from_blob (
         p_csv                IN BLOB
       , p_collection_name    IN VARCHAR2
       , p_header_included    IN CHAR := 'N'
       , p_delimiter          IN CHAR := ','
       , p_enclosed_by        IN CHAR := NULL
       , p_replace_collection IN CHAR := 'Y'
      );
     
    /*****************************************************************************/
    END csv;
  3. Compile this package body:
    create or replace
    PACKAGE BODY csv AS
    /*****************************************************************************/
    
      PROCEDURE assert (
         p_condition IN BOOLEAN
       , p_err_code  IN PLS_INTEGER
       , p_err_msg   IN VARCHAR2
      )
    
      IS
     
      BEGIN
     
         IF (NOT p_condition OR p_condition IS NULL)
         THEN
            raise_application_error(NVL(p_err_code ,-20001), p_err_msg);
         END IF;
        
      END assert;
     
    /*****************************************************************************/
    
      PROCEDURE create_collection_from_clob (
         p_csv                IN CLOB
       , p_collection_name    IN VARCHAR2
       , p_header_included    IN CHAR := 'N'
       , p_delimiter          IN CHAR := ','
       , p_enclosed_by        IN CHAR := NULL
       , p_replace_collection IN CHAR := 'Y'
      )
    
      AS
     
         TYPE csv_rt IS RECORD (
            attr_01 VARCHAR2(4000)
          , attr_02 VARCHAR2(4000)
          , attr_03 VARCHAR2(4000)
          , attr_04 VARCHAR2(4000)
          , attr_05 VARCHAR2(4000)
          , attr_06 VARCHAR2(4000)
          , attr_07 VARCHAR2(4000)
          , attr_08 VARCHAR2(4000)
          , attr_09 VARCHAR2(4000)
          , attr_10 VARCHAR2(4000)
          , attr_11 VARCHAR2(4000)
          , attr_12 VARCHAR2(4000)
          , attr_13 VARCHAR2(4000)
          , attr_14 VARCHAR2(4000)
          , attr_15 VARCHAR2(4000)
          , attr_16 VARCHAR2(4000)
          , attr_17 VARCHAR2(4000)
          , attr_18 VARCHAR2(4000)
          , attr_19 VARCHAR2(4000)
          , attr_20 VARCHAR2(4000)
          , attr_21 VARCHAR2(4000)
          , attr_22 VARCHAR2(4000)
          , attr_23 VARCHAR2(4000)
          , attr_24 VARCHAR2(4000)
          , attr_25 VARCHAR2(4000)
          , attr_26 VARCHAR2(4000)
          , attr_27 VARCHAR2(4000)
          , attr_28 VARCHAR2(4000)
          , attr_29 VARCHAR2(4000)
          , attr_30 VARCHAR2(4000)
          , attr_31 VARCHAR2(4000)
          , attr_32 VARCHAR2(4000)
          , attr_33 VARCHAR2(4000)
          , attr_34 VARCHAR2(4000)
          , attr_35 VARCHAR2(4000)
          , attr_36 VARCHAR2(4000)
          , attr_37 VARCHAR2(4000)
          , attr_38 VARCHAR2(4000)
          , attr_39 VARCHAR2(4000)
          , attr_40 VARCHAR2(4000)
          , attr_41 VARCHAR2(4000)
          , attr_42 VARCHAR2(4000)
          , attr_43 VARCHAR2(4000)
          , attr_44 VARCHAR2(4000)
          , attr_45 VARCHAR2(4000)
          , attr_46 VARCHAR2(4000)
          , attr_47 VARCHAR2(4000)
          , attr_48 VARCHAR2(4000)
          , attr_49 VARCHAR2(4000)
          , attr_50 VARCHAR2(4000)
         );
     
         l_csv              CLOB := 'X';
         l_csv_length       PLS_INTEGER;
         l_newline_position PLS_INTEGER := 1;
         l_record_length    PLS_INTEGER;
         l_record_count     PLS_INTEGER := 0;
         l_record_text      VARCHAR2(32767);
         l_column_count     PLS_INTEGER := 0;
         l_column_text      VARCHAR2(4000);
         l_record           CSV_RT;
         l_delimiter        CHAR(1);
         l_header_included  CHAR(1);
         l_stage            VARCHAR2(32767);
         
         PROCEDURE populate_attr (
            p_attr_num IN PLS_INTEGER
          , p_attr_val IN VARCHAR2
         )
         IS
         BEGIN
            CASE p_attr_num
               WHEN 1 THEN l_record.attr_01 := p_attr_val;
               WHEN 2 THEN l_record.attr_02 := p_attr_val;
               WHEN 3 THEN l_record.attr_03 := p_attr_val;
               WHEN 4 THEN l_record.attr_04 := p_attr_val;
               WHEN 5 THEN l_record.attr_05 := p_attr_val;
               WHEN 6 THEN l_record.attr_06 := p_attr_val;
               WHEN 7 THEN l_record.attr_07 := p_attr_val;
               WHEN 8 THEN l_record.attr_08 := p_attr_val;
               WHEN 9 THEN l_record.attr_09 := p_attr_val;
               WHEN 10 THEN l_record.attr_10 := p_attr_val;
               WHEN 11 THEN l_record.attr_11 := p_attr_val;
               WHEN 12 THEN l_record.attr_12 := p_attr_val;
               WHEN 13 THEN l_record.attr_13 := p_attr_val;
               WHEN 14 THEN l_record.attr_14 := p_attr_val;
               WHEN 15 THEN l_record.attr_15 := p_attr_val;
               WHEN 16 THEN l_record.attr_16 := p_attr_val;
               WHEN 17 THEN l_record.attr_17 := p_attr_val;
               WHEN 18 THEN l_record.attr_18 := p_attr_val;
               WHEN 19 THEN l_record.attr_19 := p_attr_val;
               WHEN 20 THEN l_record.attr_20 := p_attr_val;
               WHEN 21 THEN l_record.attr_21 := p_attr_val;
               WHEN 22 THEN l_record.attr_22 := p_attr_val;
               WHEN 23 THEN l_record.attr_23 := p_attr_val;
               WHEN 24 THEN l_record.attr_24 := p_attr_val;
               WHEN 25 THEN l_record.attr_25 := p_attr_val;
               WHEN 26 THEN l_record.attr_26 := p_attr_val;
               WHEN 27 THEN l_record.attr_27 := p_attr_val;
               WHEN 28 THEN l_record.attr_28 := p_attr_val;
               WHEN 29 THEN l_record.attr_29 := p_attr_val;
               WHEN 30 THEN l_record.attr_30 := p_attr_val;
               WHEN 31 THEN l_record.attr_31 := p_attr_val;
               WHEN 32 THEN l_record.attr_32 := p_attr_val;
               WHEN 33 THEN l_record.attr_33 := p_attr_val;
               WHEN 34 THEN l_record.attr_34 := p_attr_val;
               WHEN 35 THEN l_record.attr_35 := p_attr_val;
               WHEN 36 THEN l_record.attr_36 := p_attr_val;
               WHEN 37 THEN l_record.attr_37 := p_attr_val;
               WHEN 38 THEN l_record.attr_38 := p_attr_val;
               WHEN 39 THEN l_record.attr_39 := p_attr_val;
               WHEN 40 THEN l_record.attr_40 := p_attr_val;
               WHEN 41 THEN l_record.attr_41 := p_attr_val;
               WHEN 42 THEN l_record.attr_42 := p_attr_val;
               WHEN 43 THEN l_record.attr_43 := p_attr_val;
               WHEN 44 THEN l_record.attr_44 := p_attr_val;
               WHEN 45 THEN l_record.attr_45 := p_attr_val;
               WHEN 46 THEN l_record.attr_46 := p_attr_val;
               WHEN 47 THEN l_record.attr_47 := p_attr_val;
               WHEN 48 THEN l_record.attr_48 := p_attr_val;
               WHEN 49 THEN l_record.attr_49 := p_attr_val;
               WHEN 50 THEN l_record.attr_50 := p_attr_val;
            END CASE;
         END populate_attr;
    
         PROCEDURE queue_record
         IS
         BEGIN
            l_record_count := l_record_count + 1;
            l_record_text := dbms_lob.substr(
               l_csv
             , dbms_lob.instr(l_csv, CHR(10), l_newline_position) - l_newline_position
             , l_newline_position
            );
            l_record_length := LENGTH(l_record_text);
            l_newline_position := l_newline_position + l_record_length + 1;
         END queue_record;
        
         PROCEDURE process_non_enclosed_record
         IS
            l_char_text CHAR(1);
         BEGIN
            FOR i IN 1 .. l_record_length
            LOOP
               l_char_text := SUBSTR(l_record_text, i, 1);
               IF l_char_text != l_delimiter OR l_record_length = i
               THEN
                  l_column_text := l_column_text || l_char_text;
                 
                  IF l_record_length = i
                  THEN
                     populate_attr(l_column_count + 1, l_column_text);
                     INSERT INTO csv_temp VALUES l_record;
                    
                     l_column_count := 0;
                     l_column_text := NULL;
                     EXIT;
                  END IF;
               ELSE
                  l_column_count := l_column_count + 1;
                  populate_attr(l_column_count, l_column_text);
                 
                  l_column_text := NULL;
               END IF;
            END LOOP;
         END process_non_enclosed_record;
        
         PROCEDURE process_enclosed_record
         IS
            l_char_text      CHAR(1);
            l_last_char_text CHAR(1);
            l_next_char_text CHAR(1);
            l_encloser_on    BOOLEAN := FALSE;
         BEGIN
            FOR i IN 1 .. l_record_length
            LOOP
               l_last_char_text := l_char_text;
               l_char_text := SUBSTR(l_record_text, i, 1);
               l_next_char_text := SUBSTR(l_record_text, i + 1, 1);
               l_encloser_on := (
                  (l_char_text = p_enclosed_by AND (l_last_char_text = l_delimiter OR i = 1))
                  OR (l_char_text != p_enclosed_by AND l_encloser_on)
               );
              
               IF (l_char_text != l_delimiter AND l_char_text != p_enclosed_by)
                  OR (l_char_text = l_delimiter AND l_encloser_on)
                  OR (l_char_text = p_enclosed_by AND l_next_char_text != l_delimiter AND l_last_char_text != l_delimiter AND i != 1)
                  OR l_record_length = i
               THEN
                  l_column_text := l_column_text || l_char_text;
                 
                  IF l_record_length = i
                  THEN
                     populate_attr(l_column_count + 1, l_column_text);
                     INSERT INTO csv_temp VALUES l_record;
                    
                     l_column_count := 0;
                     l_column_text := NULL;
                     EXIT;
                  END IF;
               ELSIF NOT (
                     (l_char_text = p_enclosed_by AND l_next_char_text = l_delimiter)
                  OR (l_char_text = p_enclosed_by AND l_last_char_text = l_delimiter)
                  OR (i = 1 AND l_char_text != l_delimiter)
               )
               THEN
                  l_column_count := l_column_count + 1;
                  populate_attr(l_column_count, l_column_text);
                 
                  l_column_text := NULL;
               END IF;
            END LOOP;
         END;
    
      BEGIN
     
         l_stage := 'Checking pre-conditions';
         assert(p_csv IS NOT NULL, -20001, 'p_csv must be NOT NULL');
         assert(p_collection_name IS NOT NULL, -20002, 'p_collection_name must be NOT NULL');
         assert(p_header_included IS NOT NULL, -20003, 'p_header_included must be NOT NULL');
         assert(p_header_included IN ('Y','N'), -20004, 'p_header_included must be ''Y'' or ''N''');
         assert(p_delimiter IS NOT NULL, -20005, 'p_delimiter must be NOT NULL');
         assert(p_replace_collection IS NOT NULL, -20006, 'p_replace_collection must be NOT NULL');
         assert(p_replace_collection IN ('Y','N'), -20007, 'p_replace_collection must be ''Y'' or ''N''');
     
         l_stage := 'Setting defaults';
         l_delimiter :=
            CASE
               WHEN p_delimiter IS NULL THEN ','
               ELSE p_delimiter
            END;
        
         l_stage := 'Copying passed in CLOB to local CLOB';
         dbms_lob.copy(l_csv, p_csv, dbms_lob.getlength(p_csv));
        
         l_stage := 'Checking to see if passed in CLOB ends with a CR';
         IF dbms_lob.instr(p_csv, CHR(10), dbms_lob.getlength(p_csv) - 1) = 0
         THEN
            l_stage := 'Appending CR to local CLOB';
            dbms_lob.append(l_csv, CHR(10));
         END IF;
              
         l_stage := 'Getting the length of the local CLOB';
         l_csv_length := dbms_lob.getlength(l_csv);
        
         l_stage := 'Queuing the first record';
         queue_record;
        
         l_stage := 'Checking to see if first record should be part of data';
         IF p_header_included = 'N'
         THEN
            l_stage := 'Processing first record';
            IF p_enclosed_by IS NULL
            THEN
               process_non_enclosed_record;
            ELSE
               process_enclosed_record;
            END IF;
         END IF;
        
         l_stage := 'Processing the rest of the records';
         IF p_enclosed_by IS NULL
         THEN
            WHILE l_newline_position < l_csv_length
            LOOP
               queue_record;
               process_non_enclosed_record;
            END LOOP;
         ELSE
            WHILE l_newline_position < l_csv_length
            LOOP
               queue_record;
               process_enclosed_record;
            END LOOP;
         END IF;
    
         IF p_replace_collection = 'Y' AND apex_collection.collection_exists(p_collection_name)
         THEN
            l_stage := 'Removing existing collection';
            apex_collection.delete_collection(p_collection_name);
         END IF;
        
         l_stage := 'Creating collection from CSV_TEMP table';
         apex_collection.create_collection_from_query_b(
            p_collection_name => p_collection_name
          , p_query           => 'SELECT * FROM csv_temp'
         );
    
      END create_collection_from_clob;
    
    /*****************************************************************************/
    
      PROCEDURE create_collection_from_blob (
         p_csv                IN BLOB
       , p_collection_name    IN VARCHAR2
       , p_header_included    IN CHAR := 'N'
       , p_delimiter          IN CHAR := ','
       , p_enclosed_by        IN CHAR := NULL
       , p_replace_collection IN CHAR := 'Y'
      )
    
      AS
    
         l_csv_clob     CLOB := 'X';
         l_dest_offset  INTEGER := 1;
         l_src_offset   INTEGER := 1;
         l_lang_context INTEGER := dbms_lob.default_lang_ctx;
         l_warning      INTEGER;
         l_stage        VARCHAR2(32767);
    
      BEGIN
     
         l_stage := 'Checking pre-conditions';
         assert(p_csv IS NOT NULL, -20001, 'p_csv must be NOT NULL');
         assert(p_collection_name IS NOT NULL, -20002, 'p_collection_name must be NOT NULL');
         assert(p_header_included IS NOT NULL, -20003, 'p_header_included must be NOT NULL');
         assert(p_header_included IN ('Y','N'), -20004, 'p_header_included must be ''Y'' or ''N''');
         assert(p_delimiter IS NOT NULL, -20005, 'p_delimiter must be NOT NULL');
         assert(p_replace_collection IS NOT NULL, -20006, 'p_replace_collection must be NOT NULL');
         assert(p_replace_collection IN ('Y','N'), -20007, 'p_replace_collection must be ''Y'' or ''N''');
    
         l_stage := 'Converting BLOB to CLOB';
         dbms_lob.converttoclob (
            dest_lob     => l_csv_clob
          , src_blob     => p_csv
          , amount       => dbms_lob.lobmaxsize
          , dest_offset  => l_dest_offset
          , src_offset   => l_src_offset
          , blob_csid    => dbms_lob.default_csid
          , lang_context => l_lang_context
          , warning      => l_warning
         );
    
         l_stage := 'Calling create_collection_from_clob to process CLOB';
         create_collection_from_clob(
            p_csv                => l_csv_clob
          , p_collection_name    => p_collection_name
          , p_header_included    => p_header_included
          , p_delimiter          => p_delimiter
          , p_enclosed_by        => p_enclosed_by
          , p_replace_collection => p_replace_collection
         );
    
      END create_collection_from_blob;
    
    /*****************************************************************************/
    END csv;
  4. Create an application page with a File Browse item and adapt the following code to meet your needs.
    DECLARE
    
      l_blob BLOB;
    
      PROCEDURE cleanup
      IS
      BEGIN
         DELETE FROM apex_application_files
         WHERE name = :PXX_FILE_LOCATION;
      END cleanup;
    
    BEGIN
    
      SELECT blob_content
      INTO l_blob
      FROM apex_application_files
      WHERE name = :PXX_FILE_LOCATION;
     
      csv.create_collection_from_blob(l_blob, 'CSV_UPLOAD', 'Y');
    
      cleanup;
    
    EXCEPTION
    
      WHEN OTHERS
      THEN
         cleanup;
        
         RAISE;
    
    END;