fire2a.downstream_protection_value

DownStream Protection Value Algorithm can be reduced to recursively counting out nodes in a directed tree graph; When providing node values, recursively summing them. In the context of Cell2Fire simulator, the graph is a (or many) fire propagation tree(s), represented in a (multi) directed graph.

Its main idea is that upstream cells are priority over downstream cells because of the calculated fire propagations; So if you protect an upstream cell, fire won't get to the downstream cells; pondered by the protection values

Its inputs are:

  • A directed graph(*), passed as a 'Messages.csv' file, with 'in_node', 'out_node', 'weight' columns
  • [optional] a raster with the values to protect (any Non-Data, any Value types) As the aggregation function is sum, means bigger values are more protection than smaller or negative values.

Its output is:

  • A raster represeting the graph with added values (or number of cells) downstream (or out nodes, recursively).

(*): If it is not a tree, it will be reduced to a tree by the shortest path algorithm

This code:

  • Reads all Messages[0-9]+.csv files from a directory ([0-9]+ represents any integer number)
  • Reads the protection values from a gdal compatible raster file
  • Calculates the downstream protection value algorithm serially for windows, and parallel for linux
  • Is hooked up to usage_template/downstream_protection_value.ipynb for graphical exploration
  1#!/bin/env python3
  2__author__ = "Fernando Badilla"
  3__revision__ = "$Format:%H$"
  4__doc__ = """
  5DownStream Protection Value Algorithm can be reduced to recursively counting out nodes in a directed tree graph; When providing node values, recursively summing them. In the context of Cell2Fire simulator, the graph is a (or many) fire propagation tree(s), represented in a (multi) directed graph.
  6
  7Its main idea is that upstream cells are priority over downstream cells because of the calculated fire propagations; So if you protect an upstream cell, fire won't get to the downstream cells; pondered by the protection values
  8
  9Its inputs are:
 10
 11* A directed graph(*), passed as a 'Messages.csv' file, with 'in_node', 'out_node', 'weight' columns
 12* [optional] a raster with the values to protect (any Non-Data, any Value types) As the aggregation function is sum, means bigger values are more protection than smaller or negative values.
 13
 14Its output is:
 15
 16* A raster represeting the graph with added values (or number of cells) downstream (or out nodes, recursively).
 17
 18(*): If it is not a tree, it will be reduced to a tree by the shortest path algorithm
 19
 20This code:
 21
 22* Reads all Messages[0-9]+.csv files from a directory ([0-9]+ represents any integer number)
 23* Reads the protection values from a gdal compatible raster file
 24* Calculates the downstream protection value algorithm serially for windows, and parallel for linux
 25* Is hooked up to usage_template/downstream_protection_value.ipynb for graphical exploration
 26"""
 27import logging
 28import re
 29import sys
 30from pathlib import Path
 31
 32import networkx as nx
 33import numpy as np
 34from matplotlib import pyplot as plt
 35from networkx import DiGraph
 36from numpy import ndarray
 37from osgeo import gdal
 38
 39from . import setup_logger
 40from .raster import id2xy as r_id2xy
 41from .raster import read_raster
 42
 43logger = logging.getLogger(__name__)
 44
 45
 46def id2xy(idx, w=40, h=40):
 47    """idx: index, w: width, h:height"""
 48    return r_id2xy(idx - 1, w, h)
 49
 50
 51# def id2xy(idx, w=40, h=40):
 52#     """idx: index, w: width, h:height"""
 53#     idx -= 1
 54#     return idx % w, idx // w
 55
 56# def read_raster(filename: str, band: int = 1, data: bool = True, info: bool = True) -> tuple[np.ndarray, dict]:
 57
 58
 59def single_simulation_downstream_protection_value(msgfile="MessagesFile01.csv", pvfile="py.asc"):
 60    """load one diGraph count succesors"""
 61    msgG, root = digraph_from_messages(msgfile)
 62    treeG = shortest_propagation_tree(msgG, root)
 63    pv, W, H = get_flat_pv(pvfile)
 64    #
 65    dpv = np.zeros(pv.shape, dtype=pv.dtype)
 66    i2n = [n - 1 for n in treeG]
 67    mdpv = dpv_maskG(treeG, root, pv, i2n)
 68    dpv[i2n] = mdpv
 69    return mdpv, dpv
 70
 71
 72def downstream_protection_value(out_dir, pvfile):
 73    pv, W, H = get_flat_pv(pvfile)
 74    dpv = np.zeros(pv.shape, dtype=pv.dtype)
 75    file_list = read_files(out_dir)
 76    for msgfile in file_list:
 77        msgG, root = digraph_from_messages(msgfile)
 78        treeG = shortest_propagation_tree(msgG, root)
 79        i2n = [n - 1 for n in treeG]  # TODO change to list(treeG)
 80        mdpv = dpv_maskG(treeG, root, pv, i2n)
 81        dpv[i2n] += mdpv
 82        # plot_pv( dpv, w=W, h=H)
 83    return dpv / len(file_list)
 84
 85
 86def canon3(afile):
 87    # NO IMPLEMENTADO
 88    G = nx.read_edgelist(
 89        path=afile, create_using=nx.DiGraph(), nodetype=np.int32, data=[("time", np.int16)], delimiter=","
 90    )
 91    return G
 92
 93
 94def canon4(afile):
 95    G = nx.read_edgelist(
 96        path=afile,
 97        create_using=nx.DiGraph(),
 98        nodetype=np.int32,
 99        data=[("time", np.int16), ("ros", np.float32)],
100        delimiter=",",
101    )
102    return G
103
104
105def digraph_from_messages(afile):
106    """Not checking if file exists or if size > 0
107    This is done previously on read_files
108    """
109    data = np.loadtxt(
110        afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("time", np.int16)], usecols=(0, 1, 2)
111    )
112    root = data[0][0]  # checkar que el primer valor del message sea el punto de ignición
113    G = nx.DiGraph()
114    G.add_weighted_edges_from(data)
115    return G, root
116
117
118func = np.vectorize(lambda x, y: {"time": x, "ros": y})
119
120
121def custom4(afile):
122    data = np.loadtxt(
123        afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("time", np.int16), ("ros", np.float32)]
124    )
125    root = data[0][0]
126    G = nx.DiGraph()
127    bunch = np.vstack((data["i"], data["j"], func(data["time"], data["ros"]))).T
128    G.add_edges_from(bunch)
129    return G.add_edges_from(bunch), root
130
131
132def shortest_propagation_tree(G, root):
133    """construct a tree with the all shortest path from root
134    TODO try accumulating already added edges for checking before asigning should be faster?
135    """
136    # { node : [root,...,node], ... }
137    shortest_paths = nx.single_source_dijkstra_path(G, root, weight="time")
138    del shortest_paths[root]
139    T = nx.DiGraph()
140    for node, shopat in shortest_paths.items():
141        for i, node in enumerate(shopat[:-1]):
142            T.add_edge(node, shopat[i + 1])
143    return T
144
145
146def recursiveUp(G):
147    """count up WTF!!!
148    leafs = [x for x in T.nodes if T.out_degree(x)==0]
149    """
150    for i in G.nodes:
151        G.nodes[i]["dv"] = 1
152
153        # G.nodes[i]['dv']=0
154    # for leaf in (x for x in G.nodes if G.out_degree(x)==0):
155    #    G.nodes[leaf]['dv']=1
156    def count_up(G, j):
157        for i in G.predecessors(j):
158            # G.nodes[i]['dv']+=G.nodes[j]['dv']
159            G.nodes[i]["dv"] += 1
160            print(i, j, G.nodes[i]["dv"])
161            count_up(G, i)
162
163    for leaf in (x for x in G.nodes if G.out_degree(x) == 0):
164        count_up(G, leaf)
165
166
167def dpv_maskG(G, root, pv, i2n=None):
168    """calculate downstream protection value in a flat protection value raster
169    i2n = [n for n in treeG.nodes]
170    1. copies a slice of pv, only Graph nodes
171    2. recursively sums downstream for all succesors of the graph (starting from root)
172    3. returns the slice (range(len(G) indexed)
173    G must be a tree
174    """
175    if i2n is None:
176        i2n = [n - 1 for n in treeG]
177    # -1 ok?
178    mdpv = pv[i2n]
179
180    # assert mdpv.base is None ,'the slice is not a copy!'
181    def recursion(G, i, pv, mdpv, i2n):
182        for j in G.successors(i):
183            mdpv[i2n.index(i - 1)] += recursion(G, j, pv, mdpv, i2n)
184        return mdpv[i2n.index(i - 1)]
185
186    recursion(G, root, pv, mdpv, i2n)
187    return mdpv
188
189
190def recursion2(G: DiGraph, i: int, mdpv: ndarray, i2n: list[int]) -> ndarray:
191    for j in G.successors(i):
192        mdpv[i2n.index(i)] += recursion2(G, j, mdpv, i2n)
193    return mdpv[i2n.index(i)]
194
195
196def add_dpv_graph(G, root, pv):
197    """modifies the input Graph recursively:
198        1. sums pv into predecesor (or calling)
199        2. recursively sums downstream for all succesors
200        3. returns itself if no successors
201    G must be a tree with 'dv'
202    hence returns nothing
203    """
204    for n in G.nodes:
205        G.nodes[n]["dv"] += pv[n - 1]
206
207    def recursion(G, i):
208        for j in G.successors(i):
209            G.nodes[i]["dv"] += recursion(G, j)
210        return G.nodes[i]["dv"]
211
212    recursion(G, root)
213
214
215def sum_dpv_graph(T, root, pv):
216    """returns a copy of T that:
217        1. sets pv into each node
218        2. recursively sums pv downstream
219    T must be a tree (not checked)
220    """
221    G = T.copy()
222    for i in G.nodes:
223        G.nodes[i]["dv"] = pv[i - 1]
224
225    def recursion(G, i):
226        for j in G.successors(i):
227            G.nodes[i]["dv"] += recursion(G, j)
228        return G.nodes[i]["dv"]
229
230    recursion(G, root)
231    return G
232
233
234def count_downstream_graph(T, root) -> nx.DiGraph:
235    """returns a new Graph with node values of the number of conected nodes downstream"""
236    assert nx.is_tree(T), "not tree"
237    G = T.copy()
238    for i in G.nodes:
239        G.nodes[i]["dv"] = 1
240
241    def recursion(G, i):
242        for j in G.successors(i):
243            G.nodes[i]["dv"] += recursion(G, j)
244        return G.nodes[i]["dv"]
245
246    recursion(G, root)
247    return G
248
249
250def glob_int_sorted(directory: Path, filename: str = "MessagesFile.csv"):
251    """reads all MessagesFile<int>.csv >0 size files from directory, regexes numbers casting to int, sorts them
252    Args:
253        directory (Path): directory to read
254        filename (str): filename to match
255    Returns:
256        list: ordered list of pathlib files
257    """
258    if not directory.is_dir():
259        raise NotADirectoryError(f"{directory=} is not a directory")
260    file_name = Path(filename)
261    file_list = [f for f in directory.glob(file_name.stem + "[0-9]*" + file_name.suffix) if f.stat().st_size > 0]
262    if len(file_list) == 0:
263        raise FileNotFoundError(f"{directory=} has no {file_name=} matching >0 size files")
264    file_string = " ".join([f.stem for f in file_list])
265    # sort
266    sim_num = np.fromiter(re.findall("([0-9]+)", file_string), dtype=int, count=len(file_list))
267    asort = np.argsort(sim_num)
268    sim_num = sim_num[asort]
269    file_list = list(np.array(file_list)[asort])
270    return file_list
271
272
273def get_flat_pv(afile):
274    """opens the file with gdal as raster, get 1st band, flattens it
275    also returns width and height
276    """
277    dataset = gdal.Open(str(afile), gdal.GA_ReadOnly)
278    return dataset.GetRasterBand(1).ReadAsArray().ravel(), dataset.RasterXSize, dataset.RasterYSize
279
280
281def plot(G, pos=None, labels=None):
282    """matplotlib
283    TODO scientific format numeric labels
284    """
285    if not pos:
286        pos = {node: [*id2xy(node)] for node in G.nodes}
287    if not labels:
288        labes = {node: node for node in G.nodes}
289    plt.figure()
290    nx.draw(G, pos=pos, with_labels=False)
291    nx.draw_networkx_labels(G, pos=pos, labels=labels)
292    return plt.show()
293
294
295def plot_pv(pv, w=40, h=40):
296    mat = pv.reshape(h, w)
297    plt.matshow(mat)
298    plt.show()
299
300
301def worker(data, pv, sid):
302
303    # digraph_from_messages(msgfile) -> msgG, root
304    msgG = DiGraph()
305    msgG.add_weighted_edges_from(data)
306    root = data[0][0]
307    # shortest_propagation_tree(G, root) -> treeG
308    shortest_paths = nx.single_source_dijkstra_path(msgG, root, weight="time")
309    del shortest_paths[root]
310    treeG = DiGraph()
311    for node, shopat in shortest_paths.items():
312        for i, node in enumerate(shopat[:-1]):
313            treeG.add_edge(node, shopat[i + 1])
314    # dpv_maskG(G, root, pv, i2n) -> mdpv
315    i2n = [n for n in treeG]
316    mdpv = pv[i2n]
317    recursion(treeG, root, mdpv, i2n)
318    # dpv[i2n] += mdpv
319    return mdpv, i2n, sid
320
321
322from re import search
323
324
325def load_msg(afile: Path):
326    try:
327        sim_id = search(r"\d+", afile.stem).group(0)
328    except:
329        sim_id = "-1"
330    data = np.loadtxt(
331        afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)], usecols=(0, 1, 2), ndmin=1
332    )
333    return data, sim_id
334
335
336def get_data(files, callback=None):
337    data = []
338    for count, afile in enumerate(files):
339        sim_id = search("\\d+", afile.stem).group(0)
340        data += [
341            np.loadtxt(
342                afile,
343                delimiter=",",
344                dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)],
345                usecols=(0, 1, 2),
346                ndmin=1,
347            )
348        ]
349        # 1 based to 0 based
350        data[-1]["i"] -= 1
351        data[-1]["j"] -= 1
352        if callback:
353            callback(count, sim_id, afile)
354        yield data
355    with open("messages.pickle", "wb") as f:
356        pickle_dump(data, f)
357
358
359def one_sim_work(afile, pv, sid):
360    # digraph_from_messages(msgfile) -> msgG, root
361    data = np.loadtxt(
362        afile,
363        delimiter=",",
364        dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)],
365        usecols=(0, 1, 2),
366        ndmin=1,
367    )
368    # 1 based to 0 based
369    data["i"] -= 1
370    data["j"] -= 1
371    msgG = nx.DiGraph()
372    msgG.add_weighted_edges_from(data)
373    root = data[0][0]
374    # shortest_propagation_tree(G, root) -> treeG
375    shortest_paths = nx.single_source_dijkstra_path(msgG, root, weight="time")
376    del shortest_paths[root]
377    treeG = nx.DiGraph()
378    for node, shopat in shortest_paths.items():
379        for i, node in enumerate(shopat[:-1]):
380            treeG.add_edge(node, shopat[i + 1])
381    # dpv_maskG(G, root, pv, i2n) -> mdpv
382    i2n = [n for n in treeG]
383    mdpv = pv[i2n]
384    recursion2(treeG, root, mdpv, i2n)
385    # dpv[i2n] += mdpv
386    print("fin", sid)
387    return mdpv, i2n, sid
388
389
390def argument_parser(argv):
391    """parse arguments
392    - logger: verbosity, logfile
393    """
394    import argparse
395
396    def path_check(path):
397        path = Path(path)
398        if path.is_dir() or (path.is_file() and path.stat().st_size > 0):
399            return path
400        raise argparse.ArgumentTypeError(f"{path.as_uri()=} is not a valid Path or file size is 0")
401
402    parser = argparse.ArgumentParser(
403        description=__doc__,
404        formatter_class=argparse.RawDescriptionHelpFormatter,
405    )
406    parser.add_argument(
407        "-v",
408        "--verbosity",
409        help="increase output verbosity (0: warning(default), 1: info, 2>=: debug)",
410        action="count",
411        default=0,
412    )
413    parser.add_argument("-l", "--logfile", help="rotating log file see fire2a.setup_logger", type=Path)
414    parser.add_argument(
415        "-dpvn",
416        "--dpv-filename",
417        help="dpv raster layer output filename",
418        type=str,
419        default="dpv.tif",
420    )
421    parser.add_argument(
422        "-pv",
423        "--protection-value",
424        help="pv raster layer, must be gdal.Open compatible, and match messages width & height",
425        type=path_check,
426    )
427    parser.add_argument(
428        "-pvb",
429        "--protection-value-band",
430        help="pv raster layer band to read, defaults to 1",
431        type=int,
432        default=1,
433    )
434    parser.add_argument("-W", help="width of the raster (overrides width read from pv raster)", type=int)
435    parser.add_argument("-H", help="height of the raster (overrides width read from pv raster)", type=int)
436    parser.add_argument(
437        nargs="+",
438        dest="messages",
439        help=(
440            "Messages[0-9+].csv file(s) or directory to parse (hint use glob: Messages*.csv to pass many files or pass"
441            " a single directory to read all Messages[0-9+].csv files in it)"
442        ),
443        type=path_check,
444    )
445    return parser.parse_args(argv)
446
447
448def main(argv=None):
449    """steps to run dpv"""
450    if argv is None:
451        argv = sys.argv[1:]
452    args = argument_parser(argv)
453    logger = setup_logger(__name__, args.verbosity, args.logfile)
454    logger.info(f"{args=}")
455    logger.debug("debugging...")
456
457    file_list = []
458    for path_str in args.messages:
459        path = Path(path_str)
460        if path.is_file() and path.stat().st_size > 0:
461            file_list += [path]
462        elif path.is_dir():
463            file_list += glob_int_sorted(path)
464        else:
465            logger.warning(f"{path} is not a file or directory")
466    # remove duplicates
467    file_list = list(dict.fromkeys(file_list))
468    logger.info(f"{file_list=}")
469
470    if args.protection_value:
471        pv_data, pv_info = read_raster(str(args.protection_value), args.protection_value_band)
472        W, H, GT, PJ = pv_info["RasterXSize"], pv_info["RasterYSize"], pv_info["Transform"], pv_info["Projection"]
473        # make 1D
474        pv = pv_data.ravel()
475        logger.info(f"{pv_data.dtype=}, {pv_data.shape=}, {pv_info=}")
476        dpv = np.zeros(pv.shape, dtype=pv.dtype)
477    elif args.W and args.H:
478        W, H = args.W, args.H
479        GT = (0, 1, 0, 0, 0, 1)
480        PJ = "EPSG:4326"
481        pv = np.ones(W * H, dtype=np.int32)
482        dpv = np.zeros(W * H, dtype=np.int32)
483
484    from multiprocessing import Pool, cpu_count
485
486    with Pool(processes=cpu_count() - 1) as pool:
487        results = [pool.apply_async(one_sim_work, args=(afile, pv, i)) for i, afile in enumerate(file_list)]
488        for result in results:
489            sdpv, si2n, sid = result.get()
490            dpv[si2n] += sdpv
491    # scale
492    dpv = dpv / len(file_list)
493    print(f"{dpv=}")
494    # write
495    dst_ds = gdal.GetDriverByName("GTiff").Create(args.dpv_filename, W, H, 1, gdal.GDT_Float32)
496    # get driver by name to create a geo tiff raster
497    dst_ds.SetGeoTransform(GT)
498    dst_ds.SetProjection(PJ)
499    band = dst_ds.GetRasterBand(1)
500    band.SetUnitType("protection_value")
501    if 0 != band.SetNoDataValue(0):
502        feedback.pushWarning(f"Set No Data failed for {self.OUT_R}")
503    if 0 != band.WriteArray(np.float32(dpv.reshape(H, W))):
504        feedback.pushWarning(f"WriteArray failed for {self.OUT_R}")
505    band.FlushCache()
506    dst_ds.FlushCache()
507    dst_ds = None
508    return 0
509
510
511if __name__ == "__main__":
512    sys.exit(main())
513
514
515def no():
516    # if len(sys.argv)>1:
517    #     input_dir = sys.argv[1]
518    #     output_dir = sys.argv[2]
519    # else:
520    print("run in C2FSB folder")
521    # input_dir = Path.cwd() / 'data'
522    input_dir = Path("/home/fdo/source/C2F-W3/data/Vilopriu_2013/firesim_231008_110004")
523    # output_dir = Path.cwd() / 'results'
524    output_dir = Path("/home/fdo/source/C2F-W3/data/Vilopriu_2013/firesim_231008_110004/results")
525    print(input_dir, output_dir)
526    assert input_dir.is_dir() and output_dir.is_dir()
527
528    # abro el directorio de los messages como una lista con los nombres de los archivos
529    file_list = read_files(output_dir)
530
531    # agarrar la capa que ocuparemos como valor a proteger
532    ## pv: valores en riesgo
533    ## W: Width
534    ## H: Height
535    pv, W, H = get_flat_pv(input_dir / "fuels.asc")
536
537    #
538    # single simulation
539    #
540    afile = file_list[0]
541    msgG, root = digraph_from_messages(afile)
542    pos = {node: [*id2xy(node)] for node in msgG}
543    treeG = shortest_propagation_tree(msgG, root)
544
545    # count the number of nodes downstream
546    countG = count_downstream_graph(treeG, root)
547
548    # asignar el número de nodos aguas abajo a cada nodo respectivamente
549    countGv = {n: countG.nodes[n]["dv"] for n in countG}
550    plot(countG, pos=pos, labels=countGv)
551    # {'dv': 137} == 137 root connects all tree
552    assert countG.nodes[root]["dv"] == len(countG), "count_downstream at root is not the same as number of nodes!"
553    #
554    onev = np.ones(pv.shape)
555    #
556    # sum dpv=1
557    sumG = sum_dpv_graph(treeG, root, onev)
558    sumGv = {n: sumG.nodes[n]["dv"] for n in sumG}
559    plot(sumG, pos=pos, labels=sumGv)
560    assert np.all([sumGv[n] == countGv[n] for n in treeG.nodes]), "sum_dpv(pv=1) != countG values!"
561    #
562    # add dpv=1
563    addG = treeG.copy()
564    for n in addG.nodes:
565        addG.nodes[n]["dv"] = 0
566    add_dpv_graph(addG, root, onev)
567    addGv = {n: addG.nodes[n]["dv"] for n in addG}
568    plot(addG, pos=pos, labels=addGv)
569    assert np.all([addGv[n] == countGv[n] for n in treeG.nodes]), "add_dpv(pv=1) != countG values!"
570    #
571    # cum dpv=1
572    dpv = np.zeros(pv.shape, dtype=pv.dtype)
573    i2n = [n - 1 for n in treeG]
574    mdpv = dpv_maskG(treeG, root, onev, i2n)
575    dpv[i2n] = mdpv
576    plot_pv(dpv, w=W, h=H)
577    assert np.all([mdpv[i2n.index(n - 1)] == countGv[n] for n in treeG.nodes]), "dpv_maskG(pv=1) != countG values!"
578    #
579    # single full test
580    mdpv, dpv = single_simulation_downstream_protection_value(msgfile=afile, pvfile=input_dir / "bp.asc")
581    plot_pv(dpv, w=W, h=H)
582    plot(treeG, pos=pos, labels={n: np.format_float_scientific(dpv[n], precision=2) for n in treeG})
583    assert np.all(np.isclose(mdpv, dpv[i2n])), "dpv_maskG != dpvalues!"
584    #
585    # finally
586    dpv = downstream_protection_value(output_dir, pvfile=input_dir / "bp.asc")
587    plot_pv(dpv, w=W, h=H)
588
589
590"""
591$cd C2FSB
592C2FSB$ python3 downstream_protection_value.py
593
594all functions are tested and plotted on main
595
596Calculate downstream protection value from Messages/MessagesFile<int>.csv files
597Each file has 4 columns: from cellId, to cellId, period when burns & hit ROS
598
599https://github.com/fire2a/C2FK/blob/main/Cell2Fire/Heuristics.py
600
601https://networkx.org/documentation/networkx-1.8/reference/algorithms.shortest_paths.html
602
603propagation tree: (c) fire shortest traveling times
604
605Performance review
6061. is faster to dijkstra than minimun spanning
607
608    In [50]: %timeit shortest_propagation_tree(G,root)
609    1.53 ms ± 5.47 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
610    
611    In [51]: %timeit nx.minimum_spanning_arborescence(G, attr='time')
612    16.4 ms ± 61 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
613
6142. is faster numpy+add_edges than nx.from_csv
615
616    In [63]: %timeit custom4(afile)
617    2.3 ms ± 32 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
618    
619    In [64]: %timeit canon4(afile)
620    3.35 ms ± 20 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
621
6222.1 even faster is you discard a column!!
623    In [65]: %timeit digraph_from_messages(afile)
624    1.84 ms ± 15.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
625
626"""
logger = <Logger fire2a.downstream_protection_value (WARNING)>
def id2xy(idx, w=40, h=40):
47def id2xy(idx, w=40, h=40):
48    """idx: index, w: width, h:height"""
49    return r_id2xy(idx - 1, w, h)

idx: index, w: width, h:height

def single_simulation_downstream_protection_value(msgfile='MessagesFile01.csv', pvfile='py.asc'):
60def single_simulation_downstream_protection_value(msgfile="MessagesFile01.csv", pvfile="py.asc"):
61    """load one diGraph count succesors"""
62    msgG, root = digraph_from_messages(msgfile)
63    treeG = shortest_propagation_tree(msgG, root)
64    pv, W, H = get_flat_pv(pvfile)
65    #
66    dpv = np.zeros(pv.shape, dtype=pv.dtype)
67    i2n = [n - 1 for n in treeG]
68    mdpv = dpv_maskG(treeG, root, pv, i2n)
69    dpv[i2n] = mdpv
70    return mdpv, dpv

load one diGraph count succesors

def downstream_protection_value(out_dir, pvfile):
73def downstream_protection_value(out_dir, pvfile):
74    pv, W, H = get_flat_pv(pvfile)
75    dpv = np.zeros(pv.shape, dtype=pv.dtype)
76    file_list = read_files(out_dir)
77    for msgfile in file_list:
78        msgG, root = digraph_from_messages(msgfile)
79        treeG = shortest_propagation_tree(msgG, root)
80        i2n = [n - 1 for n in treeG]  # TODO change to list(treeG)
81        mdpv = dpv_maskG(treeG, root, pv, i2n)
82        dpv[i2n] += mdpv
83        # plot_pv( dpv, w=W, h=H)
84    return dpv / len(file_list)
def canon3(afile):
87def canon3(afile):
88    # NO IMPLEMENTADO
89    G = nx.read_edgelist(
90        path=afile, create_using=nx.DiGraph(), nodetype=np.int32, data=[("time", np.int16)], delimiter=","
91    )
92    return G
def canon4(afile):
 95def canon4(afile):
 96    G = nx.read_edgelist(
 97        path=afile,
 98        create_using=nx.DiGraph(),
 99        nodetype=np.int32,
100        data=[("time", np.int16), ("ros", np.float32)],
101        delimiter=",",
102    )
103    return G
def digraph_from_messages(afile):
106def digraph_from_messages(afile):
107    """Not checking if file exists or if size > 0
108    This is done previously on read_files
109    """
110    data = np.loadtxt(
111        afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("time", np.int16)], usecols=(0, 1, 2)
112    )
113    root = data[0][0]  # checkar que el primer valor del message sea el punto de ignición
114    G = nx.DiGraph()
115    G.add_weighted_edges_from(data)
116    return G, root

Not checking if file exists or if size > 0 This is done previously on read_files

func = <numpy.vectorize object>
def custom4(afile):
122def custom4(afile):
123    data = np.loadtxt(
124        afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("time", np.int16), ("ros", np.float32)]
125    )
126    root = data[0][0]
127    G = nx.DiGraph()
128    bunch = np.vstack((data["i"], data["j"], func(data["time"], data["ros"]))).T
129    G.add_edges_from(bunch)
130    return G.add_edges_from(bunch), root
def shortest_propagation_tree(G, root):
133def shortest_propagation_tree(G, root):
134    """construct a tree with the all shortest path from root
135    TODO try accumulating already added edges for checking before asigning should be faster?
136    """
137    # { node : [root,...,node], ... }
138    shortest_paths = nx.single_source_dijkstra_path(G, root, weight="time")
139    del shortest_paths[root]
140    T = nx.DiGraph()
141    for node, shopat in shortest_paths.items():
142        for i, node in enumerate(shopat[:-1]):
143            T.add_edge(node, shopat[i + 1])
144    return T

construct a tree with the all shortest path from root TODO try accumulating already added edges for checking before asigning should be faster?

def recursiveUp(G):
147def recursiveUp(G):
148    """count up WTF!!!
149    leafs = [x for x in T.nodes if T.out_degree(x)==0]
150    """
151    for i in G.nodes:
152        G.nodes[i]["dv"] = 1
153
154        # G.nodes[i]['dv']=0
155    # for leaf in (x for x in G.nodes if G.out_degree(x)==0):
156    #    G.nodes[leaf]['dv']=1
157    def count_up(G, j):
158        for i in G.predecessors(j):
159            # G.nodes[i]['dv']+=G.nodes[j]['dv']
160            G.nodes[i]["dv"] += 1
161            print(i, j, G.nodes[i]["dv"])
162            count_up(G, i)
163
164    for leaf in (x for x in G.nodes if G.out_degree(x) == 0):
165        count_up(G, leaf)

count up WTF!!! leafs = [x for x in T.nodes if T.out_degree(x)==0]

def dpv_maskG(G, root, pv, i2n=None):
168def dpv_maskG(G, root, pv, i2n=None):
169    """calculate downstream protection value in a flat protection value raster
170    i2n = [n for n in treeG.nodes]
171    1. copies a slice of pv, only Graph nodes
172    2. recursively sums downstream for all succesors of the graph (starting from root)
173    3. returns the slice (range(len(G) indexed)
174    G must be a tree
175    """
176    if i2n is None:
177        i2n = [n - 1 for n in treeG]
178    # -1 ok?
179    mdpv = pv[i2n]
180
181    # assert mdpv.base is None ,'the slice is not a copy!'
182    def recursion(G, i, pv, mdpv, i2n):
183        for j in G.successors(i):
184            mdpv[i2n.index(i - 1)] += recursion(G, j, pv, mdpv, i2n)
185        return mdpv[i2n.index(i - 1)]
186
187    recursion(G, root, pv, mdpv, i2n)
188    return mdpv

calculate downstream protection value in a flat protection value raster i2n = [n for n in treeG.nodes]

  1. copies a slice of pv, only Graph nodes
  2. recursively sums downstream for all succesors of the graph (starting from root)
  3. returns the slice (range(len(G) indexed) G must be a tree
def recursion2( G: networkx.classes.digraph.DiGraph, i: int, mdpv: numpy.ndarray, i2n: list[int]) -> numpy.ndarray:
191def recursion2(G: DiGraph, i: int, mdpv: ndarray, i2n: list[int]) -> ndarray:
192    for j in G.successors(i):
193        mdpv[i2n.index(i)] += recursion2(G, j, mdpv, i2n)
194    return mdpv[i2n.index(i)]
def add_dpv_graph(G, root, pv):
197def add_dpv_graph(G, root, pv):
198    """modifies the input Graph recursively:
199        1. sums pv into predecesor (or calling)
200        2. recursively sums downstream for all succesors
201        3. returns itself if no successors
202    G must be a tree with 'dv'
203    hence returns nothing
204    """
205    for n in G.nodes:
206        G.nodes[n]["dv"] += pv[n - 1]
207
208    def recursion(G, i):
209        for j in G.successors(i):
210            G.nodes[i]["dv"] += recursion(G, j)
211        return G.nodes[i]["dv"]
212
213    recursion(G, root)

modifies the input Graph recursively: 1. sums pv into predecesor (or calling) 2. recursively sums downstream for all succesors 3. returns itself if no successors G must be a tree with 'dv' hence returns nothing

def sum_dpv_graph(T, root, pv):
216def sum_dpv_graph(T, root, pv):
217    """returns a copy of T that:
218        1. sets pv into each node
219        2. recursively sums pv downstream
220    T must be a tree (not checked)
221    """
222    G = T.copy()
223    for i in G.nodes:
224        G.nodes[i]["dv"] = pv[i - 1]
225
226    def recursion(G, i):
227        for j in G.successors(i):
228            G.nodes[i]["dv"] += recursion(G, j)
229        return G.nodes[i]["dv"]
230
231    recursion(G, root)
232    return G

returns a copy of T that: 1. sets pv into each node 2. recursively sums pv downstream T must be a tree (not checked)

def count_downstream_graph(T, root) -> networkx.classes.digraph.DiGraph:
235def count_downstream_graph(T, root) -> nx.DiGraph:
236    """returns a new Graph with node values of the number of conected nodes downstream"""
237    assert nx.is_tree(T), "not tree"
238    G = T.copy()
239    for i in G.nodes:
240        G.nodes[i]["dv"] = 1
241
242    def recursion(G, i):
243        for j in G.successors(i):
244            G.nodes[i]["dv"] += recursion(G, j)
245        return G.nodes[i]["dv"]
246
247    recursion(G, root)
248    return G

returns a new Graph with node values of the number of conected nodes downstream

def glob_int_sorted(directory: pathlib.Path, filename: str = 'MessagesFile.csv'):
251def glob_int_sorted(directory: Path, filename: str = "MessagesFile.csv"):
252    """reads all MessagesFile<int>.csv >0 size files from directory, regexes numbers casting to int, sorts them
253    Args:
254        directory (Path): directory to read
255        filename (str): filename to match
256    Returns:
257        list: ordered list of pathlib files
258    """
259    if not directory.is_dir():
260        raise NotADirectoryError(f"{directory=} is not a directory")
261    file_name = Path(filename)
262    file_list = [f for f in directory.glob(file_name.stem + "[0-9]*" + file_name.suffix) if f.stat().st_size > 0]
263    if len(file_list) == 0:
264        raise FileNotFoundError(f"{directory=} has no {file_name=} matching >0 size files")
265    file_string = " ".join([f.stem for f in file_list])
266    # sort
267    sim_num = np.fromiter(re.findall("([0-9]+)", file_string), dtype=int, count=len(file_list))
268    asort = np.argsort(sim_num)
269    sim_num = sim_num[asort]
270    file_list = list(np.array(file_list)[asort])
271    return file_list

reads all MessagesFile.csv >0 size files from directory, regexes numbers casting to int, sorts them Args: directory (Path): directory to read filename (str): filename to match Returns: list: ordered list of pathlib files

def get_flat_pv(afile):
274def get_flat_pv(afile):
275    """opens the file with gdal as raster, get 1st band, flattens it
276    also returns width and height
277    """
278    dataset = gdal.Open(str(afile), gdal.GA_ReadOnly)
279    return dataset.GetRasterBand(1).ReadAsArray().ravel(), dataset.RasterXSize, dataset.RasterYSize

opens the file with gdal as raster, get 1st band, flattens it also returns width and height

def plot(G, pos=None, labels=None):
282def plot(G, pos=None, labels=None):
283    """matplotlib
284    TODO scientific format numeric labels
285    """
286    if not pos:
287        pos = {node: [*id2xy(node)] for node in G.nodes}
288    if not labels:
289        labes = {node: node for node in G.nodes}
290    plt.figure()
291    nx.draw(G, pos=pos, with_labels=False)
292    nx.draw_networkx_labels(G, pos=pos, labels=labels)
293    return plt.show()

matplotlib TODO scientific format numeric labels

def plot_pv(pv, w=40, h=40):
296def plot_pv(pv, w=40, h=40):
297    mat = pv.reshape(h, w)
298    plt.matshow(mat)
299    plt.show()
def worker(data, pv, sid):
302def worker(data, pv, sid):
303
304    # digraph_from_messages(msgfile) -> msgG, root
305    msgG = DiGraph()
306    msgG.add_weighted_edges_from(data)
307    root = data[0][0]
308    # shortest_propagation_tree(G, root) -> treeG
309    shortest_paths = nx.single_source_dijkstra_path(msgG, root, weight="time")
310    del shortest_paths[root]
311    treeG = DiGraph()
312    for node, shopat in shortest_paths.items():
313        for i, node in enumerate(shopat[:-1]):
314            treeG.add_edge(node, shopat[i + 1])
315    # dpv_maskG(G, root, pv, i2n) -> mdpv
316    i2n = [n for n in treeG]
317    mdpv = pv[i2n]
318    recursion(treeG, root, mdpv, i2n)
319    # dpv[i2n] += mdpv
320    return mdpv, i2n, sid
def load_msg(afile: pathlib.Path):
326def load_msg(afile: Path):
327    try:
328        sim_id = search(r"\d+", afile.stem).group(0)
329    except:
330        sim_id = "-1"
331    data = np.loadtxt(
332        afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)], usecols=(0, 1, 2), ndmin=1
333    )
334    return data, sim_id
def get_data(files, callback=None):
337def get_data(files, callback=None):
338    data = []
339    for count, afile in enumerate(files):
340        sim_id = search("\\d+", afile.stem).group(0)
341        data += [
342            np.loadtxt(
343                afile,
344                delimiter=",",
345                dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)],
346                usecols=(0, 1, 2),
347                ndmin=1,
348            )
349        ]
350        # 1 based to 0 based
351        data[-1]["i"] -= 1
352        data[-1]["j"] -= 1
353        if callback:
354            callback(count, sim_id, afile)
355        yield data
356    with open("messages.pickle", "wb") as f:
357        pickle_dump(data, f)
def one_sim_work(afile, pv, sid):
360def one_sim_work(afile, pv, sid):
361    # digraph_from_messages(msgfile) -> msgG, root
362    data = np.loadtxt(
363        afile,
364        delimiter=",",
365        dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)],
366        usecols=(0, 1, 2),
367        ndmin=1,
368    )
369    # 1 based to 0 based
370    data["i"] -= 1
371    data["j"] -= 1
372    msgG = nx.DiGraph()
373    msgG.add_weighted_edges_from(data)
374    root = data[0][0]
375    # shortest_propagation_tree(G, root) -> treeG
376    shortest_paths = nx.single_source_dijkstra_path(msgG, root, weight="time")
377    del shortest_paths[root]
378    treeG = nx.DiGraph()
379    for node, shopat in shortest_paths.items():
380        for i, node in enumerate(shopat[:-1]):
381            treeG.add_edge(node, shopat[i + 1])
382    # dpv_maskG(G, root, pv, i2n) -> mdpv
383    i2n = [n for n in treeG]
384    mdpv = pv[i2n]
385    recursion2(treeG, root, mdpv, i2n)
386    # dpv[i2n] += mdpv
387    print("fin", sid)
388    return mdpv, i2n, sid
def argument_parser(argv):
391def argument_parser(argv):
392    """parse arguments
393    - logger: verbosity, logfile
394    """
395    import argparse
396
397    def path_check(path):
398        path = Path(path)
399        if path.is_dir() or (path.is_file() and path.stat().st_size > 0):
400            return path
401        raise argparse.ArgumentTypeError(f"{path.as_uri()=} is not a valid Path or file size is 0")
402
403    parser = argparse.ArgumentParser(
404        description=__doc__,
405        formatter_class=argparse.RawDescriptionHelpFormatter,
406    )
407    parser.add_argument(
408        "-v",
409        "--verbosity",
410        help="increase output verbosity (0: warning(default), 1: info, 2>=: debug)",
411        action="count",
412        default=0,
413    )
414    parser.add_argument("-l", "--logfile", help="rotating log file see fire2a.setup_logger", type=Path)
415    parser.add_argument(
416        "-dpvn",
417        "--dpv-filename",
418        help="dpv raster layer output filename",
419        type=str,
420        default="dpv.tif",
421    )
422    parser.add_argument(
423        "-pv",
424        "--protection-value",
425        help="pv raster layer, must be gdal.Open compatible, and match messages width & height",
426        type=path_check,
427    )
428    parser.add_argument(
429        "-pvb",
430        "--protection-value-band",
431        help="pv raster layer band to read, defaults to 1",
432        type=int,
433        default=1,
434    )
435    parser.add_argument("-W", help="width of the raster (overrides width read from pv raster)", type=int)
436    parser.add_argument("-H", help="height of the raster (overrides width read from pv raster)", type=int)
437    parser.add_argument(
438        nargs="+",
439        dest="messages",
440        help=(
441            "Messages[0-9+].csv file(s) or directory to parse (hint use glob: Messages*.csv to pass many files or pass"
442            " a single directory to read all Messages[0-9+].csv files in it)"
443        ),
444        type=path_check,
445    )
446    return parser.parse_args(argv)

parse arguments

  • logger: verbosity, logfile
def main(argv=None):
449def main(argv=None):
450    """steps to run dpv"""
451    if argv is None:
452        argv = sys.argv[1:]
453    args = argument_parser(argv)
454    logger = setup_logger(__name__, args.verbosity, args.logfile)
455    logger.info(f"{args=}")
456    logger.debug("debugging...")
457
458    file_list = []
459    for path_str in args.messages:
460        path = Path(path_str)
461        if path.is_file() and path.stat().st_size > 0:
462            file_list += [path]
463        elif path.is_dir():
464            file_list += glob_int_sorted(path)
465        else:
466            logger.warning(f"{path} is not a file or directory")
467    # remove duplicates
468    file_list = list(dict.fromkeys(file_list))
469    logger.info(f"{file_list=}")
470
471    if args.protection_value:
472        pv_data, pv_info = read_raster(str(args.protection_value), args.protection_value_band)
473        W, H, GT, PJ = pv_info["RasterXSize"], pv_info["RasterYSize"], pv_info["Transform"], pv_info["Projection"]
474        # make 1D
475        pv = pv_data.ravel()
476        logger.info(f"{pv_data.dtype=}, {pv_data.shape=}, {pv_info=}")
477        dpv = np.zeros(pv.shape, dtype=pv.dtype)
478    elif args.W and args.H:
479        W, H = args.W, args.H
480        GT = (0, 1, 0, 0, 0, 1)
481        PJ = "EPSG:4326"
482        pv = np.ones(W * H, dtype=np.int32)
483        dpv = np.zeros(W * H, dtype=np.int32)
484
485    from multiprocessing import Pool, cpu_count
486
487    with Pool(processes=cpu_count() - 1) as pool:
488        results = [pool.apply_async(one_sim_work, args=(afile, pv, i)) for i, afile in enumerate(file_list)]
489        for result in results:
490            sdpv, si2n, sid = result.get()
491            dpv[si2n] += sdpv
492    # scale
493    dpv = dpv / len(file_list)
494    print(f"{dpv=}")
495    # write
496    dst_ds = gdal.GetDriverByName("GTiff").Create(args.dpv_filename, W, H, 1, gdal.GDT_Float32)
497    # get driver by name to create a geo tiff raster
498    dst_ds.SetGeoTransform(GT)
499    dst_ds.SetProjection(PJ)
500    band = dst_ds.GetRasterBand(1)
501    band.SetUnitType("protection_value")
502    if 0 != band.SetNoDataValue(0):
503        feedback.pushWarning(f"Set No Data failed for {self.OUT_R}")
504    if 0 != band.WriteArray(np.float32(dpv.reshape(H, W))):
505        feedback.pushWarning(f"WriteArray failed for {self.OUT_R}")
506    band.FlushCache()
507    dst_ds.FlushCache()
508    dst_ds = None
509    return 0

steps to run dpv

def no():
516def no():
517    # if len(sys.argv)>1:
518    #     input_dir = sys.argv[1]
519    #     output_dir = sys.argv[2]
520    # else:
521    print("run in C2FSB folder")
522    # input_dir = Path.cwd() / 'data'
523    input_dir = Path("/home/fdo/source/C2F-W3/data/Vilopriu_2013/firesim_231008_110004")
524    # output_dir = Path.cwd() / 'results'
525    output_dir = Path("/home/fdo/source/C2F-W3/data/Vilopriu_2013/firesim_231008_110004/results")
526    print(input_dir, output_dir)
527    assert input_dir.is_dir() and output_dir.is_dir()
528
529    # abro el directorio de los messages como una lista con los nombres de los archivos
530    file_list = read_files(output_dir)
531
532    # agarrar la capa que ocuparemos como valor a proteger
533    ## pv: valores en riesgo
534    ## W: Width
535    ## H: Height
536    pv, W, H = get_flat_pv(input_dir / "fuels.asc")
537
538    #
539    # single simulation
540    #
541    afile = file_list[0]
542    msgG, root = digraph_from_messages(afile)
543    pos = {node: [*id2xy(node)] for node in msgG}
544    treeG = shortest_propagation_tree(msgG, root)
545
546    # count the number of nodes downstream
547    countG = count_downstream_graph(treeG, root)
548
549    # asignar el número de nodos aguas abajo a cada nodo respectivamente
550    countGv = {n: countG.nodes[n]["dv"] for n in countG}
551    plot(countG, pos=pos, labels=countGv)
552    # {'dv': 137} == 137 root connects all tree
553    assert countG.nodes[root]["dv"] == len(countG), "count_downstream at root is not the same as number of nodes!"
554    #
555    onev = np.ones(pv.shape)
556    #
557    # sum dpv=1
558    sumG = sum_dpv_graph(treeG, root, onev)
559    sumGv = {n: sumG.nodes[n]["dv"] for n in sumG}
560    plot(sumG, pos=pos, labels=sumGv)
561    assert np.all([sumGv[n] == countGv[n] for n in treeG.nodes]), "sum_dpv(pv=1) != countG values!"
562    #
563    # add dpv=1
564    addG = treeG.copy()
565    for n in addG.nodes:
566        addG.nodes[n]["dv"] = 0
567    add_dpv_graph(addG, root, onev)
568    addGv = {n: addG.nodes[n]["dv"] for n in addG}
569    plot(addG, pos=pos, labels=addGv)
570    assert np.all([addGv[n] == countGv[n] for n in treeG.nodes]), "add_dpv(pv=1) != countG values!"
571    #
572    # cum dpv=1
573    dpv = np.zeros(pv.shape, dtype=pv.dtype)
574    i2n = [n - 1 for n in treeG]
575    mdpv = dpv_maskG(treeG, root, onev, i2n)
576    dpv[i2n] = mdpv
577    plot_pv(dpv, w=W, h=H)
578    assert np.all([mdpv[i2n.index(n - 1)] == countGv[n] for n in treeG.nodes]), "dpv_maskG(pv=1) != countG values!"
579    #
580    # single full test
581    mdpv, dpv = single_simulation_downstream_protection_value(msgfile=afile, pvfile=input_dir / "bp.asc")
582    plot_pv(dpv, w=W, h=H)
583    plot(treeG, pos=pos, labels={n: np.format_float_scientific(dpv[n], precision=2) for n in treeG})
584    assert np.all(np.isclose(mdpv, dpv[i2n])), "dpv_maskG != dpvalues!"
585    #
586    # finally
587    dpv = downstream_protection_value(output_dir, pvfile=input_dir / "bp.asc")
588    plot_pv(dpv, w=W, h=H)