From 056d5dd9398cddf6d7bc80fa8f1c07e6bb40b605 Mon Sep 17 00:00:00 2001 From: Sean Quigley Date: Sat, 8 Jul 2017 13:45:43 -0400 Subject: [PATCH 1/6] add support for reading GIFs --- python/sparkdl/image/imageIO.py | 84 +++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 4 deletions(-) diff --git a/python/sparkdl/image/imageIO.py b/python/sparkdl/image/imageIO.py index 03c101cc..73c8be28 100644 --- a/python/sparkdl/image/imageIO.py +++ b/python/sparkdl/image/imageIO.py @@ -199,23 +199,78 @@ def _decodeImage(imageData): image = imageArrayToStruct(imgArray, mode.sparkMode) return image + +def _decodeGif(gifData): + """ + Decode compressed GIF data into a sequence of images. + + :param gifData: (bytes, bytearray) compressed GIF data in PIL compatible format. + :return: list of tuples of zero-indexed numbers and + DataFrame Rows of image structs: (idx, struct) + """ + try: + img = Image.open(BytesIO(gifData)) + except IOError: + return None + + if img.tile and img.tile[0] and img.tile[0][0] == "gif": + mode = pilModeLookup["RGB"] + else: + msg = "GIFs are not supported with image mode: {mode}" + warn(msg.format(mode=img.mode)) + return None + + frames = [] + i = 0 + mypalette = img.getpalette() + try: + while True: + img.putpalette(mypalette) + newImg = Image.new("RGB", img.size) + newImg.paste(img) + newImg.show() + + newImgArray = np.asarray(newImg) + newImage = imageArrayToStruct(newImgArray, mode.sparkMode) + frames.append(newImage) + + i += 1 + img.seek(img.tell() + 1) + except EOFError: + # end of sequence + pass + + return frames + # Creating a UDF on import can cause SparkContext issues sometimes. # decodeImage = udf(_decodeImage, imageSchema) +def filesToRDD(sc, path, numPartitions=None): + """ + Read files from a directory to an RDD. + + :param sc: SparkContext. + :param path: str, path to files. + :param numPartitions: int, number or partitions to use for reading files. + :return: RDD, with columns: (filePath: str, fileData: BinaryType) + """ + numPartitions = numPartitions or sc.defaultParallelism + rdd = sc.binaryFiles(path, minPartitions=numPartitions).repartition(numPartitions) + return rdd.map(lambda x: (x[0], bytearray(x[1]))) + + def filesToDF(sc, path, numPartitions=None): """ Read files from a directory to a DataFrame. :param sc: SparkContext. :param path: str, path to files. - :param numPartition: int, number or partitions to use for reading files. + :param numPartitions: int, number or partitions to use for reading files. :return: DataFrame, with columns: (filePath: str, fileData: BinaryType) """ - numPartitions = numPartitions or sc.defaultParallelism schema = StructType([StructField("filePath", StringType(), False), StructField("fileData", BinaryType(), False)]) - rdd = sc.binaryFiles(path, minPartitions=numPartitions).repartition(numPartitions) - rdd = rdd.map(lambda x: (x[0], bytearray(x[1]))) + rdd = filesToRDD(sc, path, numPartitions) return rdd.toDF(schema) @@ -235,3 +290,24 @@ def _readImages(imageDirectory, numPartition, sc): decodeImage = udf(_decodeImage, imageSchema) imageData = filesToDF(sc, imageDirectory, numPartitions=numPartition) return imageData.select("filePath", decodeImage("fileData").alias("image")) + + +def readGifs(gifDirectory, numPartition=None): + """ + Read a directory of GIFs (or a single GIF) into a DataFrame. + + :param sc: spark context + :param gifDirectory: str, file path. + :param numPartition: int, number or partitions to use for reading files. + :return: DataFrame, with columns: (filepath: str, image: imageSchema). + """ + return _readGifs(gifDirectory, numPartition, SparkContext.getOrCreate()) + + +def _readGifs(gifDirectory, numPartition, sc): + schema = StructType([StructField("filePath", StringType(), False), + StructField("frameNum", IntegerType(), False), + StructField("gifFrame", imageSchema, False)]) + gifsRDD = filesToRDD(sc, gifDirectory, numPartitions=numPartition) + framesRDD = gifsRDD.flatMap(lambda x: [(x[0], i, frame) for (i, frame) in _decodeGif(x[1])]) + return framesRDD.toDF(schema) From 6cd604fe4bf2e35433c84e1b1ceedb2eaee09000 Mon Sep 17 00:00:00 2001 From: Sean Quigley Date: Tue, 11 Jul 2017 21:59:40 -0400 Subject: [PATCH 2/6] fix issues with readGifs and start GIF unit tests --- .gitignore | 4 ++ python/sparkdl/image/imageIO.py | 10 ++-- python/tests/image/test_imageIO.py | 88 ++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 26ae9a84..b5e3cf28 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *.log *.pyc build/*.jar +.coverage docs/_site docs/api @@ -18,6 +19,9 @@ src_managed/ project/boot/ project/plugins/project/ +# spark +metastore_db + # intellij .idea/ diff --git a/python/sparkdl/image/imageIO.py b/python/sparkdl/image/imageIO.py index 73c8be28..2c1554b7 100644 --- a/python/sparkdl/image/imageIO.py +++ b/python/sparkdl/image/imageIO.py @@ -211,14 +211,14 @@ def _decodeGif(gifData): try: img = Image.open(BytesIO(gifData)) except IOError: - return None + return [(None, None)] if img.tile and img.tile[0] and img.tile[0][0] == "gif": mode = pilModeLookup["RGB"] else: msg = "GIFs are not supported with image mode: {mode}" warn(msg.format(mode=img.mode)) - return None + return [(None, None)] frames = [] i = 0 @@ -232,7 +232,7 @@ def _decodeGif(gifData): newImgArray = np.asarray(newImg) newImage = imageArrayToStruct(newImgArray, mode.sparkMode) - frames.append(newImage) + frames.append((i, newImage)) i += 1 img.seek(img.tell() + 1) @@ -306,8 +306,8 @@ def readGifs(gifDirectory, numPartition=None): def _readGifs(gifDirectory, numPartition, sc): schema = StructType([StructField("filePath", StringType(), False), - StructField("frameNum", IntegerType(), False), - StructField("gifFrame", imageSchema, False)]) + StructField("frameNum", IntegerType(), True), + StructField("gifFrame", imageSchema, True)]) gifsRDD = filesToRDD(sc, gifDirectory, numPartitions=numPartition) framesRDD = gifsRDD.flatMap(lambda x: [(x[0], i, frame) for (i, frame) in _decodeGif(x[1])]) return framesRDD.toDF(schema) diff --git a/python/tests/image/test_imageIO.py b/python/tests/image/test_imageIO.py index 0b6f6e61..2be12662 100644 --- a/python/tests/image/test_imageIO.py +++ b/python/tests/image/test_imageIO.py @@ -173,4 +173,92 @@ def test_filesTODF(self): self.assertEqual(type(first.fileData), bytearray) +# Create dome fake GIF data to work with +def create_gif_data(): + # Random image-like data + array = np.random.randint(0, 256, (10, 11, 3), 'uint8') + + # Compress as png + imgFile = BytesIO() + PIL.Image.fromarray(array).save(imgFile, 'png') + imgFile.seek(0) + + # Get Png data as stream + gifData = imgFile.read() + return array, gifData + +gifArray, gifData = create_gif_data() + + +class BinaryGifFilesMock(object): + + defaultParallelism = 4 + + def __init__(self, sc): + self.sc = sc + + def binaryFiles(self, path, minPartitions=None): + gifsData = [["file/path", gifData], + ["another/file/path", gifData], + ["bad/gif", b"badGifData"] + ] + rdd = self.sc.parallelize(gifsData) + if minPartitions is not None: + rdd = rdd.repartition(minPartitions) + return rdd + +class TestReadGifs(SparkDLTestCase): + @classmethod + def setUpClass(cls): + super(TestReadGifs, cls).setUpClass() + cls.binaryFilesMock = BinaryGifFilesMock(cls.sc) + + @classmethod + def tearDownClass(cls): + super(TestReadGifs, cls).tearDownClass() + cls.binaryFilesMock = None + + def test_decodeGif(self): + badFrames = imageIO._decodeGif(b"xxx") + self.assertEqual(badFrames, [(None, None)]) + # gifFrames = imageIO._decodeGif(gifData) + # self.assertIsNotNone(gifFrames) + # self.assertEqual(len(gifFrames), len(imageIO.imageSchema.names)) + # for n in imageIO.imageSchema.names: + # imgRow[n] + + def test_gif_round_trip(self): + # Test round trip: array -> png -> sparkImg -> array + binarySchema = StructType([StructField("data", BinaryType(), False)]) + df = self.session.createDataFrame([[bytearray(pngData)]], binarySchema) + + # Convert to images + decImg = udf(imageIO._decodeImage, imageIO.imageSchema) + imageDF = df.select(decImg("data").alias("image")) + row = imageDF.first() + + testArray = imageIO.imageStructToArray(row.image) + self.assertEqual(testArray.shape, array.shape) + self.assertEqual(testArray.dtype, array.dtype) + self.assertTrue(np.all(array == testArray)) + + def test_readGifs(self): + # Test that reading + gifDF = imageIO._readGifs("some/path", 2, self.binaryFilesMock) + self.assertTrue("filePath" in gifDF.schema.names) + self.assertTrue("frameNum" in gifDF.schema.names) + self.assertTrue("gifFrame" in gifDF.schema.names) + + # The DF should have 2 images and 1 null. + self.assertEqual(gifDF.count(), 3) + validGifs = gifDF.filter(col("gifFrame").isNotNull()) + self.assertEqual(validGifs.count(), 2) + + img = validGifs.first().image + self.assertEqual(img.height, array.shape[0]) + self.assertEqual(img.width, array.shape[1]) + self.assertEqual(imageIO.imageType(img).nChannels, array.shape[2]) + self.assertEqual(img.data, array.tobytes()) + + # TODO: make unit tests for arrayToImageRow on arrays of varying shapes, channels, dtypes. From 4e98e4ab6649b3dcbb143fa4766b03cfbc7081b6 Mon Sep 17 00:00:00 2001 From: Sean Quigley Date: Wed, 19 Jul 2017 01:47:23 -0400 Subject: [PATCH 3/6] almost have unit tests fully working --- python/sparkdl/image/imageIO.py | 6 +-- python/tests/image/test_imageIO.py | 70 ++++++++++++++++-------------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/python/sparkdl/image/imageIO.py b/python/sparkdl/image/imageIO.py index 2c1554b7..8f76262f 100644 --- a/python/sparkdl/image/imageIO.py +++ b/python/sparkdl/image/imageIO.py @@ -216,8 +216,7 @@ def _decodeGif(gifData): if img.tile and img.tile[0] and img.tile[0][0] == "gif": mode = pilModeLookup["RGB"] else: - msg = "GIFs are not supported with image mode: {mode}" - warn(msg.format(mode=img.mode)) + warn("Image file does not appear to be a GIF") return [(None, None)] frames = [] @@ -228,7 +227,6 @@ def _decodeGif(gifData): img.putpalette(mypalette) newImg = Image.new("RGB", img.size) newImg.paste(img) - newImg.show() newImgArray = np.asarray(newImg) newImage = imageArrayToStruct(newImgArray, mode.sparkMode) @@ -252,7 +250,7 @@ def filesToRDD(sc, path, numPartitions=None): :param sc: SparkContext. :param path: str, path to files. :param numPartitions: int, number or partitions to use for reading files. - :return: RDD, with columns: (filePath: str, fileData: BinaryType) + :return: RDD tuple of: (filePath: str, fileData: BinaryType) """ numPartitions = numPartitions or sc.defaultParallelism rdd = sc.binaryFiles(path, minPartitions=numPartitions).repartition(numPartitions) diff --git a/python/tests/image/test_imageIO.py b/python/tests/image/test_imageIO.py index 2be12662..8172f3d9 100644 --- a/python/tests/image/test_imageIO.py +++ b/python/tests/image/test_imageIO.py @@ -173,21 +173,23 @@ def test_filesTODF(self): self.assertEqual(type(first.fileData), bytearray) -# Create dome fake GIF data to work with +# Create some fake GIF data to work with def create_gif_data(): - # Random image-like data - array = np.random.randint(0, 256, (10, 11, 3), 'uint8') + # Random GIF-like data + arrays = [np.random.randint(0, 256, (10, 11, 3), 'uint8') for _ in xrange(3)] + frames = [PIL.Image.fromarray(a) for a in arrays] - # Compress as png - imgFile = BytesIO() - PIL.Image.fromarray(array).save(imgFile, 'png') - imgFile.seek(0) + # Compress as GIF + gifFile = BytesIO() + frames[0].save(gifFile, 'gif', save_all=True, append_images=frames[1:]) + gifFile.seek(0) - # Get Png data as stream - gifData = imgFile.read() - return array, gifData + # Get GIF data as stream + gifData = gifFile.read() + return arrays, gifData gifArray, gifData = create_gif_data() +frameArray = gifArray[0] class BinaryGifFilesMock(object): @@ -207,6 +209,7 @@ def binaryFiles(self, path, minPartitions=None): rdd = rdd.repartition(minPartitions) return rdd + class TestReadGifs(SparkDLTestCase): @classmethod def setUpClass(cls): @@ -221,26 +224,27 @@ def tearDownClass(cls): def test_decodeGif(self): badFrames = imageIO._decodeGif(b"xxx") self.assertEqual(badFrames, [(None, None)]) - # gifFrames = imageIO._decodeGif(gifData) - # self.assertIsNotNone(gifFrames) - # self.assertEqual(len(gifFrames), len(imageIO.imageSchema.names)) - # for n in imageIO.imageSchema.names: - # imgRow[n] + gifFrames = imageIO._decodeGif(gifData) + self.assertIsNotNone(gifFrames) + self.assertEqual(len(gifFrames), 3) + self.assertEqual(len(gifFrames[0][1]), len(imageIO.imageSchema.names)) + for n in imageIO.imageSchema.names: + gifFrames[0][1][n] def test_gif_round_trip(self): - # Test round trip: array -> png -> sparkImg -> array + # Test round trip: array -> GIF frame -> sparkImg -> array binarySchema = StructType([StructField("data", BinaryType(), False)]) - df = self.session.createDataFrame([[bytearray(pngData)]], binarySchema) + df = self.session.sparkContext.parallelize([bytearray(gifData)]) - # Convert to images - decImg = udf(imageIO._decodeImage, imageIO.imageSchema) - imageDF = df.select(decImg("data").alias("image")) - row = imageDF.first() + # Convert to GIF frames + rdd = df.flatMap(lambda x: [f[1] for f in imageIO._decodeGif(x)]) + framesDF = rdd.toDF(imageIO.imageSchema) + row = framesDF.first() - testArray = imageIO.imageStructToArray(row.image) - self.assertEqual(testArray.shape, array.shape) - self.assertEqual(testArray.dtype, array.dtype) - self.assertTrue(np.all(array == testArray)) + testArray = imageIO.imageStructToArray(row) + self.assertEqual(testArray.shape, frameArray.shape) + self.assertEqual(testArray.dtype, frameArray.dtype) + # self.assertTrue(np.all(frameArray == testArray)) def test_readGifs(self): # Test that reading @@ -249,16 +253,16 @@ def test_readGifs(self): self.assertTrue("frameNum" in gifDF.schema.names) self.assertTrue("gifFrame" in gifDF.schema.names) - # The DF should have 2 images and 1 null. - self.assertEqual(gifDF.count(), 3) + # The DF should have 6 images (2 images, 3 frames each) and 1 null. + self.assertEqual(gifDF.count(), 7) validGifs = gifDF.filter(col("gifFrame").isNotNull()) - self.assertEqual(validGifs.count(), 2) + self.assertEqual(validGifs.count(), 6) - img = validGifs.first().image - self.assertEqual(img.height, array.shape[0]) - self.assertEqual(img.width, array.shape[1]) - self.assertEqual(imageIO.imageType(img).nChannels, array.shape[2]) - self.assertEqual(img.data, array.tobytes()) + frame = validGifs.first().gifFrame + self.assertEqual(frame.height, frameArray.shape[0]) + self.assertEqual(frame.width, frameArray.shape[1]) + self.assertEqual(imageIO.imageType(frame).nChannels, frameArray.shape[2]) + # self.assertEqual(frame.data, frameArray.tobytes()) # TODO: make unit tests for arrayToImageRow on arrays of varying shapes, channels, dtypes. From 1d82a6b52784fe143ba269e6f37b5453f40c74ca Mon Sep 17 00:00:00 2001 From: Sean Quigley Date: Sat, 26 Aug 2017 18:28:30 -0400 Subject: [PATCH 4/6] get round-trip GIF unit tests fully working --- python/sparkdl/image/imageIO.py | 5 +++-- python/tests/image/test_imageIO.py | 16 +++++++++------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/python/sparkdl/image/imageIO.py b/python/sparkdl/image/imageIO.py index 8f76262f..2ea28ede 100644 --- a/python/sparkdl/image/imageIO.py +++ b/python/sparkdl/image/imageIO.py @@ -213,7 +213,7 @@ def _decodeGif(gifData): except IOError: return [(None, None)] - if img.tile and img.tile[0] and img.tile[0][0] == "gif": + if img.format.lower() == "gif": mode = pilModeLookup["RGB"] else: warn("Image file does not appear to be a GIF") @@ -224,7 +224,8 @@ def _decodeGif(gifData): mypalette = img.getpalette() try: while True: - img.putpalette(mypalette) + if not img.getpalette() and mypalette: + img.putpalette(mypalette) newImg = Image.new("RGB", img.size) newImg.paste(img) diff --git a/python/tests/image/test_imageIO.py b/python/tests/image/test_imageIO.py index 8172f3d9..1dbfe771 100644 --- a/python/tests/image/test_imageIO.py +++ b/python/tests/image/test_imageIO.py @@ -26,7 +26,7 @@ from sparkdl.image import imageIO from ..tests import SparkDLTestCase -# Create dome fake image data to work with +# Create some fake image data to work with def create_image_data(): # Random image-like data array = np.random.randint(0, 256, (10, 11, 3), 'uint8') @@ -176,17 +176,19 @@ def test_filesTODF(self): # Create some fake GIF data to work with def create_gif_data(): # Random GIF-like data - arrays = [np.random.randint(0, 256, (10, 11, 3), 'uint8') for _ in xrange(3)] - frames = [PIL.Image.fromarray(a) for a in arrays] + arrays2D = [np.random.randint(0, 256, (10, 11), 'uint8') for _ in xrange(3)] + arrays3D = [np.dstack((a, a, a)) for a in arrays2D] + # Create frames in P mode because Pillow always reads GIFs as P or L images + frames = [PIL.Image.fromarray(a, mode='P') for a in arrays2D] # Compress as GIF gifFile = BytesIO() - frames[0].save(gifFile, 'gif', save_all=True, append_images=frames[1:]) + frames[0].save(gifFile, 'gif', save_all=True, append_images=frames[1:], optimize=False) gifFile.seek(0) # Get GIF data as stream gifData = gifFile.read() - return arrays, gifData + return arrays3D, gifData gifArray, gifData = create_gif_data() frameArray = gifArray[0] @@ -244,7 +246,7 @@ def test_gif_round_trip(self): testArray = imageIO.imageStructToArray(row) self.assertEqual(testArray.shape, frameArray.shape) self.assertEqual(testArray.dtype, frameArray.dtype) - # self.assertTrue(np.all(frameArray == testArray)) + self.assertTrue(np.all(frameArray == testArray)) def test_readGifs(self): # Test that reading @@ -262,7 +264,7 @@ def test_readGifs(self): self.assertEqual(frame.height, frameArray.shape[0]) self.assertEqual(frame.width, frameArray.shape[1]) self.assertEqual(imageIO.imageType(frame).nChannels, frameArray.shape[2]) - # self.assertEqual(frame.data, frameArray.tobytes()) + self.assertEqual(frame.data, frameArray.tobytes()) # TODO: make unit tests for arrayToImageRow on arrays of varying shapes, channels, dtypes. From c94efedeba5c7c3ca241e9ac530313c3d0f9cfa5 Mon Sep 17 00:00:00 2001 From: Sean Quigley Date: Sat, 26 Aug 2017 18:38:53 -0400 Subject: [PATCH 5/6] pull GIF schema out of function --- python/sparkdl/image/imageIO.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/sparkdl/image/imageIO.py b/python/sparkdl/image/imageIO.py index 2ea28ede..5cb0aba6 100644 --- a/python/sparkdl/image/imageIO.py +++ b/python/sparkdl/image/imageIO.py @@ -34,6 +34,10 @@ StructField("nChannels", IntegerType(), False), StructField("data", BinaryType(), False)]) +gifSchema = StructType([StructField("filePath", StringType(), False), + StructField("frameNum", IntegerType(), True), + StructField("gifFrame", imageSchema, True)]) + # ImageType class for holding metadata about images stored in DataFrames. # fields: @@ -304,9 +308,6 @@ def readGifs(gifDirectory, numPartition=None): def _readGifs(gifDirectory, numPartition, sc): - schema = StructType([StructField("filePath", StringType(), False), - StructField("frameNum", IntegerType(), True), - StructField("gifFrame", imageSchema, True)]) gifsRDD = filesToRDD(sc, gifDirectory, numPartitions=numPartition) framesRDD = gifsRDD.flatMap(lambda x: [(x[0], i, frame) for (i, frame) in _decodeGif(x[1])]) - return framesRDD.toDF(schema) + return framesRDD.toDF(gifSchema) From f86ac03627f953ce7a6116f54a21198e5cba0261 Mon Sep 17 00:00:00 2001 From: Sean Quigley Date: Sun, 27 Aug 2017 14:10:21 -0400 Subject: [PATCH 6/6] remove xrange for python 3 compatibility --- python/tests/image/test_imageIO.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/image/test_imageIO.py b/python/tests/image/test_imageIO.py index 1dbfe771..9f47b72b 100644 --- a/python/tests/image/test_imageIO.py +++ b/python/tests/image/test_imageIO.py @@ -176,7 +176,7 @@ def test_filesTODF(self): # Create some fake GIF data to work with def create_gif_data(): # Random GIF-like data - arrays2D = [np.random.randint(0, 256, (10, 11), 'uint8') for _ in xrange(3)] + arrays2D = [np.random.randint(0, 256, (10, 11), 'uint8') for _ in range(3)] arrays3D = [np.dstack((a, a, a)) for a in arrays2D] # Create frames in P mode because Pillow always reads GIFs as P or L images frames = [PIL.Image.fromarray(a, mode='P') for a in arrays2D]