I have generated a sequence file which is about 900MB that contains 8 records, among them, the size of 6 records are around 128MB(The block size of HDFS).
In Pyspark, I read it as follows (The key and value are both custom java class):
rdd = sc.sequenceFile("hdfs:///Test.seq", keyClass = "ChunkID", valueClass="ChunkData", keyConverter="KeyToChunkConverter", valueConverter="DataToChunkConverter")
rdd.getNumPartitions() shows that there are 7 partitions. And I try to open it as follows:
def open_map():
def open_map_nested(key_value):
try:
# ChunkID, ChunkData
key, data = key_value
if key[0] == 0:
return [['if', 'if', 'if']]
else:
return [["else","else","else"]]
except Exception, e:
logging.exception(e)
return [["None","None","None"],["None","None","None"]] #["None"]
return open_map_nested
result = rdd.flatMap(open_map()).collect()
However, the memory error occurs as follows:
File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
MemoryError
It seems like it has some problem when serializes the object, and the processing time of the above program is very long. (By the way, the executor memory is 3GB in my system.)
- Updated:
Thanks for the help! I have try to use count instead, but the problem remains. I think the problem occurs in executor node when reading each input split. So I check again the configuration of my executor trying to figure out the causue. The setting is --executor-memory 2500M and --conf spark.yarn.executor.memoryOverhead=512. According to the calculation
of this article,
the effective MemoryStore capacity is around 1.2GB which is also
shown in log file as follows:
17/03/30 17:15:57 INFO memory.MemoryStore: MemoryStore started with capacity 1153.3 MB
17/03/30 17:15:58 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@192.168.100.5:34875
17/03/30 17:15:58 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
.....
17/03/30 17:16:26 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 42.2 KB, free 1153.3 MB)
17/03/30 17:16:26 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 18 ms
17/03/30 17:16:26 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 545.5 KB, free 1152.8 MB)
I also found discussion about the pySpark serializer here, but changing the batch size to unlimited does not help.
My questions are:
- Why there will be memory error when the capacity of MemoryStore is 1.2GB and each inputsplit(record) is 128MB?
- Is there any recommended way to read large sequencefile in pyspark (to reduce the time to open file or to avoid memory error)?
Any suggestions will be highly appreciated!!