python - How to sequentially aggregate the content of a dask Bag? -


i sequentially aggregate content of partitioned collection aggregation function not associative, therefore cannot use bag.fold or bag.reduction.

there bag.accumulate seems operation, returns bag per-partition intermediate results instead of final aggregate:

>>> import dask.bag db >>> >>> def collect(acc, e): ...     if acc none: ...         acc = list() ...     acc.append(e) ...     return acc ... >>> b = db.from_sequence(range(10), npartitions=3) >>> b.accumulate(collect, initial=none).compute() [none,  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]] 

basically interested in last element of accumulate output , don't want keep copy of intermediate steps in memory.

bag doesn't have sequential reduction operation, could. simple way accomplish today use use accumulate have above, ask last element of last partition. can relatively converting bag delayed values using bag.to_delayed

acc = b.accumulate(collect, initial=none) partitions = acc.to_delayed() partitions[-1][-1].compute() 

Comments

Popular posts from this blog

networking - Vagrant-provisioned VirtualBox VM is not reachable from Ubuntu host -

c# - ASP.NET Core - There is already an object named 'AspNetRoles' in the database -

ruby on rails - ArgumentError: Missing host to link to! Please provide the :host parameter, set default_url_options[:host], or set :only_path to true -