77
88import codecs
99import copy
10- import json
1110import os
11+ import tempfile
12+ import uuid
1213from collections import OrderedDict
1314from decimal import Decimal
1415from warnings import warn
1516
17+ import BTrees .OOBTree
18+ import ijson
19+ import transaction
1620import xmltodict
21+ import zc .zlibstorage
22+ import ZODB .FileStorage
1723
1824from flattentool .i18n import _
1925from flattentool .input import path_search
2026from flattentool .schema import make_sub_sheet_name
21- from flattentool .sheet import Sheet
27+ from flattentool .sheet import PersistentSheet
2228
2329BASIC_TYPES = [str , bool , int , Decimal , type (None )]
2430
@@ -112,9 +118,26 @@ def __init__(
112118 remove_empty_schema_columns = False ,
113119 rollup = False ,
114120 truncation_length = 3 ,
121+ persist = False ,
115122 ):
123+ if persist :
124+ self .zodb_db_location = (
125+ tempfile .gettempdir () + "/flattentool-" + str (uuid .uuid4 ())
126+ )
127+ zodb_storage = zc .zlibstorage .ZlibStorage (
128+ ZODB .FileStorage .FileStorage (self .zodb_db_location )
129+ )
130+ self .db = ZODB .DB (zodb_storage )
131+ else :
132+ # If None, in memory storage is used.
133+ self .db = ZODB .DB (None )
134+
135+ self .connection = self .db .open ()
136+ root = self .connection .root
137+ root .sheet_store = BTrees .OOBTree .BTree ()
138+
116139 self .sub_sheets = {}
117- self .main_sheet = Sheet ( )
140+ self .main_sheet = PersistentSheet ( connection = self . connection , name = "" )
118141 self .root_list_path = root_list_path
119142 self .root_id = root_id
120143 self .use_titles = use_titles
@@ -125,9 +148,17 @@ def __init__(
125148 self .filter_value = filter_value
126149 self .remove_empty_schema_columns = remove_empty_schema_columns
127150 self .seen_paths = set ()
151+ self .persist = persist
128152
129153 if schema_parser :
130- self .main_sheet = copy .deepcopy (schema_parser .main_sheet )
154+ self .main_sheet = PersistentSheet .from_sheet (
155+ schema_parser .main_sheet , self .connection
156+ )
157+ for sheet_name , sheet in list (self .sub_sheets .items ()):
158+ self .sub_sheets [sheet_name ] = PersistentSheet .from_sheet (
159+ sheet , self .connection
160+ )
161+
131162 self .sub_sheets = copy .deepcopy (schema_parser .sub_sheets )
132163 if remove_empty_schema_columns :
133164 # Don't use columns from the schema parser
@@ -194,18 +225,13 @@ def __init__(
194225 _ ("Only one of json_file or root_json_dict should be supplied" )
195226 )
196227
197- if json_filename :
198- with codecs .open (json_filename , encoding = "utf-8" ) as json_file :
199- try :
200- self .root_json_dict = json .load (
201- json_file , object_pairs_hook = OrderedDict , parse_float = Decimal
202- )
203- except UnicodeError as err :
204- raise BadlyFormedJSONErrorUTF8 (* err .args )
205- except ValueError as err :
206- raise BadlyFormedJSONError (* err .args )
207- else :
208- self .root_json_dict = root_json_dict
228+ if not json_filename :
229+ if self .root_list_path is None :
230+ self .root_json_list = root_json_dict
231+ else :
232+ self .root_json_list = path_search (
233+ root_json_dict , self .root_list_path .split ("/" )
234+ )
209235
210236 if preserve_fields :
211237 # Extract fields to be preserved from input file (one path per line)
@@ -240,19 +266,37 @@ def __init__(
240266 self .preserve_fields = None
241267 self .preserve_fields_input = None
242268
269+ if json_filename :
270+ if self .root_list_path is None :
271+ path = "item"
272+ else :
273+ path = root_list_path .replace ("/" , "." ) + ".item"
274+
275+ json_file = codecs .open (json_filename , encoding = "utf-8" )
276+
277+ self .root_json_list = ijson .items (json_file , path , map_type = OrderedDict )
278+
279+ try :
280+ self .parse ()
281+ except ijson .common .IncompleteJSONError as err :
282+ raise BadlyFormedJSONError (* err .args )
283+ except UnicodeDecodeError as err :
284+ raise BadlyFormedJSONErrorUTF8 (* err .args )
285+ finally :
286+ if json_filename :
287+ json_file .close ()
288+
243289 def parse (self ):
244- if self .root_list_path is None :
245- root_json_list = self .root_json_dict
246- else :
247- root_json_list = path_search (
248- self .root_json_dict , self .root_list_path .split ("/" )
249- )
250- for json_dict in root_json_list :
290+ for num , json_dict in enumerate (self .root_json_list ):
251291 if json_dict is None :
252292 # This is particularly useful for IATI XML, in order to not
253293 # fall over on empty activity, e.g. <iati-activity/>
254294 continue
255295 self .parse_json_dict (json_dict , sheet = self .main_sheet )
296+ if num % 2000 == 0 and num != 0 :
297+ transaction .commit ()
298+
299+ transaction .commit ()
256300
257301 if self .remove_empty_schema_columns :
258302 # Remove sheets with no lines of data
@@ -501,7 +545,9 @@ def parse_json_dict(
501545 parent_name , key , truncation_length = self .truncation_length
502546 )
503547 if sub_sheet_name not in self .sub_sheets :
504- self .sub_sheets [sub_sheet_name ] = Sheet (name = sub_sheet_name )
548+ self .sub_sheets [sub_sheet_name ] = PersistentSheet (
549+ name = sub_sheet_name , connection = self .connection
550+ )
505551
506552 for json_dict in value :
507553 if json_dict is None :
@@ -518,4 +564,16 @@ def parse_json_dict(
518564 raise ValueError (_ ("Unsupported type {}" ).format (type (value )))
519565
520566 if top :
521- sheet .lines .append (flattened_dict )
567+ sheet .append_line (flattened_dict )
568+
569+ def __enter__ (self ):
570+ return self
571+
572+ def __exit__ (self , type , value , traceback ):
573+ if self .persist :
574+ self .connection .close ()
575+ self .db .close ()
576+ os .remove (self .zodb_db_location )
577+ os .remove (self .zodb_db_location + ".lock" )
578+ os .remove (self .zodb_db_location + ".index" )
579+ os .remove (self .zodb_db_location + ".tmp" )
0 commit comments