我这里想到一个方法。请参考是否合适。
思路如下:
1、排序(各列的宽度是固定的)
2、映射成类似(abc,(1,31))的数据结构
3、以第一列为key,进行groupByKey操作。形成类似(abc,CompactBuffer((1,31),(2,25),(3,24))))这样的数据结构。
4、再进行过滤操作。在自定义的函数中,判断第三列是否递减。过滤出递减的元素。
5、提取出满足条件的记录,对应的key,输出key
代码如下:
importorg.apache.spark.util.collection.CompactBuffer
defmyfunc(ele:((String,Iterable[(String,String)]))):Boolean={
variter=ele._2.iterator
varpre=iter.next
varisSorted=true
while(iter.hasNext&&isSorted){
valcur=iter.next;
if(cur._2.toInt>=pre._2.toInt){
isSorted=false
}
}
isSorted
}
varfileRDD=sc.textFile("file:///tmp/test.data")
fileRDD.sortBy(line=>line,true).
map(line=>line.toString().split("`")).
map(arr=>(arr(0),(arr(1),arr(2)))).
groupByKey.
filter(myfunc).
keys.
foreach(println) |