python - How to sequentially aggregate the content of a dask Bag? -


i sequentially aggregate content of partitioned collection aggregation function not associative, therefore cannot use bag.fold or bag.reduction.

there bag.accumulate seems operation, returns bag per-partition intermediate results instead of final aggregate:

>>> import dask.bag db >>> >>> def collect(acc, e): ...     if acc none: ...         acc = list() ...     acc.append(e) ...     return acc ... >>> b = db.from_sequence(range(10), npartitions=3) >>> b.accumulate(collect, initial=none).compute() [none,  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]] 

basically interested in last element of accumulate output , don't want keep copy of intermediate steps in memory.

bag doesn't have sequential reduction operation, could. simple way accomplish today use use accumulate have above, ask last element of last partition. can relatively converting bag delayed values using bag.to_delayed

acc = b.accumulate(collect, initial=none) partitions = acc.to_delayed() partitions[-1][-1].compute() 

Comments

Popular posts from this blog

html - How to set bootstrap input responsive width? -

javascript - Highchart x and y axes data from json -

javascript - Get js console.log as python variable in QWebView pyqt -