1

I have generated a sequence file which is about 900MB that contains 8 records, among them, the size of 6 records are around 128MB(The block size of HDFS).

In Pyspark, I read it as follows (The key and value are both custom java class):

rdd = sc.sequenceFile("hdfs:///Test.seq", keyClass = "ChunkID", valueClass="ChunkData", keyConverter="KeyToChunkConverter", valueConverter="DataToChunkConverter")

rdd.getNumPartitions() shows that there are 7 partitions. And I try to open it as follows:

def open_map():
    def open_map_nested(key_value):
        try:
            # ChunkID, ChunkData 
            key, data = key_value
            if key[0] == 0:               
                return [['if', 'if', 'if']] 
            else:
                return [["else","else","else"]]
        except Exception, e:
            logging.exception(e)
            return [["None","None","None"],["None","None","None"]] #["None"]
    return open_map_nested
result = rdd.flatMap(open_map()).collect()

However, the memory error occurs as follows:

  File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/home/wong/spark_install/spark-2.0.2-bin-hadoop2.7/python/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
MemoryError

It seems like it has some problem when serializes the object, and the processing time of the above program is very long. (By the way, the executor memory is 3GB in my system.)


  • Updated:

Thanks for the help! I have try to use count instead, but the problem remains. I think the problem occurs in executor node when reading each input split. So I check again the configuration of my executor trying to figure out the causue. The setting is --executor-memory 2500M and --conf spark.yarn.executor.memoryOverhead=512. According to the calculation of this article, the effective MemoryStore capacity is around 1.2GB which is also shown in log file as follows:

17/03/30 17:15:57 INFO memory.MemoryStore: MemoryStore started with capacity 1153.3 MB
17/03/30 17:15:58 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@192.168.100.5:34875
17/03/30 17:15:58 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
.....
17/03/30 17:16:26 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 42.2 KB, free 1153.3 MB)
17/03/30 17:16:26 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 18 ms
17/03/30 17:16:26 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 545.5 KB, free 1152.8 MB)

I also found discussion about the pySpark serializer here, but changing the batch size to unlimited does not help.


My questions are:

  • Why there will be memory error when the capacity of MemoryStore is 1.2GB and each inputsplit(record) is 128MB?
  • Is there any recommended way to read large sequencefile in pyspark (to reduce the time to open file or to avoid memory error)?

Any suggestions will be highly appreciated!!

Community
  • 1
  • 1
steve
  • 145
  • 2
  • 10
  • You are collecting your data after reading it, why ? – eliasah Mar 28 '17 at 07:21
  • Just want to use an action to trigger the reading of sequence file. In my use case I will perform some feature extractions on the record. – steve Mar 29 '17 at 00:52
  • You can perform a count instead. Collecting "large" data isn't recommended because it overwhelms the driver by pulling all the data into the master and causes the error that you are getting. So remove the collect and in a new instruction use count (per example) – eliasah Mar 29 '17 at 07:21

0 Answers0