python - How to sequentially aggregate the content of a dask Bag? -


i sequentially aggregate content of partitioned collection aggregation function not associative, therefore cannot use bag.fold or bag.reduction.

there bag.accumulate seems operation, returns bag per-partition intermediate results instead of final aggregate:

>>> import dask.bag db >>> >>> def collect(acc, e): ...     if acc none: ...         acc = list() ...     acc.append(e) ...     return acc ... >>> b = db.from_sequence(range(10), npartitions=3) >>> b.accumulate(collect, initial=none).compute() [none,  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]] 

basically interested in last element of accumulate output , don't want keep copy of intermediate steps in memory.

bag doesn't have sequential reduction operation, could. simple way accomplish today use use accumulate have above, ask last element of last partition. can relatively converting bag delayed values using bag.to_delayed

acc = b.accumulate(collect, initial=none) partitions = acc.to_delayed() partitions[-1][-1].compute() 

Comments

Popular posts from this blog

networking - Vagrant-provisioned VirtualBox VM is not reachable from Ubuntu host -

c# - ASP.NET Core - There is already an object named 'AspNetRoles' in the database -

android - IllegalStateException: Cannot call this method while RecyclerView is computing a layout or scrolling -