The following code is uploaded to Kinesis Analytics application:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types
from pyflink.common import Row
env = StreamExecutionEnvironment.get_execution_environment()
def my_map_func(row):
items = [Row('1'), Row('2'), Row('3') ]
return Row(items)
ds = env.from_collection(
collection=[(1, 'aaa'), (2, 'bbb')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
ds2 = ds.map(my_map_func, output_type=Types.ROW([
Types.LIST(
Types.ROW([
Types.STRING(),
])
),
]))
with ds2.execute_and_collect() as results:
for result in results:
print(result)
Resulting in the following error when executing on the task managers:
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:89)
at org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:126)
at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90)
at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:345)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:72)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137)
I have also reproduced this same scenario in Kinesis Analytics Studio