python - How to sequentially aggregate the content of a dask Bag? -


i sequentially aggregate content of partitioned collection aggregation function not associative, therefore cannot use bag.fold or bag.reduction.

there bag.accumulate seems operation, returns bag per-partition intermediate results instead of final aggregate:

>>> import dask.bag db >>> >>> def collect(acc, e): ...     if acc none: ...         acc = list() ...     acc.append(e) ...     return acc ... >>> b = db.from_sequence(range(10), npartitions=3) >>> b.accumulate(collect, initial=none).compute() [none,  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7],  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]] 

basically interested in last element of accumulate output , don't want keep copy of intermediate steps in memory.

bag doesn't have sequential reduction operation, could. simple way accomplish today use use accumulate have above, ask last element of last partition. can relatively converting bag delayed values using bag.to_delayed

acc = b.accumulate(collect, initial=none) partitions = acc.to_delayed() partitions[-1][-1].compute() 

Comments

Popular posts from this blog

python - What's the Pythonic way to report nonfatal errors in a parser? -

sql server - Deadlock occuring in Clustered Columnstore index -

php - curl: (35) OpenSSL SSL_connect: SSL_ERROR_SYSCALL in connection to domain.com:443 -