python - How to sequentially aggregate the content of a dask Bag? -
i sequentially aggregate content of partitioned collection aggregation function not associative, therefore cannot use bag.fold
or bag.reduction
.
there bag.accumulate
seems operation, returns bag per-partition intermediate results instead of final aggregate:
>>> import dask.bag db >>> >>> def collect(acc, e): ... if acc none: ... acc = list() ... acc.append(e) ... return acc ... >>> b = db.from_sequence(range(10), npartitions=3) >>> b.accumulate(collect, initial=none).compute() [none, [0, 1, 2, 3], [0, 1, 2, 3], [0, 1, 2, 3], [0, 1, 2, 3], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]
basically interested in last element of accumulate
output , don't want keep copy of intermediate steps in memory.
bag doesn't have sequential reduction operation, could. simple way accomplish today use use accumulate
have above, ask last element of last partition. can relatively converting bag delayed values using bag.to_delayed
acc = b.accumulate(collect, initial=none) partitions = acc.to_delayed() partitions[-1][-1].compute()
Comments
Post a Comment