fire2a.downstream_protection_value
DownStream Protection Value Algorithm can be reduced to recursively counting out nodes in a directed tree graph; When providing node values, recursively summing them. In the context of Cell2Fire simulator, the graph is a (or many) fire propagation tree(s), represented in a (multi) directed graph.
Its main idea is that upstream cells are priority over downstream cells because of the calculated fire propagations; So if you protect an upstream cell, fire won't get to the downstream cells; pondered by the protection values
Its inputs are:
- A directed graph(*), passed as a 'Messages.csv' file, with 'in_node', 'out_node', 'weight' columns
- [optional] a raster with the values to protect (any Non-Data, any Value types) As the aggregation function is sum, means bigger values are more protection than smaller or negative values.
Its output is:
- A raster represeting the graph with added values (or number of cells) downstream (or out nodes, recursively).
(*): If it is not a tree, it will be reduced to a tree by the shortest path algorithm
This code:
- Reads all Messages[0-9]+.csv files from a directory ([0-9]+ represents any integer number)
- Reads the protection values from a gdal compatible raster file
- Calculates the downstream protection value algorithm serially for windows, and parallel for linux
- Is hooked up to usage_template/downstream_protection_value.ipynb for graphical exploration
1#!/bin/env python3 2__author__ = "Fernando Badilla" 3__revision__ = "$Format:%H$" 4__doc__ = """ 5DownStream Protection Value Algorithm can be reduced to recursively counting out nodes in a directed tree graph; When providing node values, recursively summing them. In the context of Cell2Fire simulator, the graph is a (or many) fire propagation tree(s), represented in a (multi) directed graph. 6 7Its main idea is that upstream cells are priority over downstream cells because of the calculated fire propagations; So if you protect an upstream cell, fire won't get to the downstream cells; pondered by the protection values 8 9Its inputs are: 10 11* A directed graph(*), passed as a 'Messages.csv' file, with 'in_node', 'out_node', 'weight' columns 12* [optional] a raster with the values to protect (any Non-Data, any Value types) As the aggregation function is sum, means bigger values are more protection than smaller or negative values. 13 14Its output is: 15 16* A raster represeting the graph with added values (or number of cells) downstream (or out nodes, recursively). 17 18(*): If it is not a tree, it will be reduced to a tree by the shortest path algorithm 19 20This code: 21 22* Reads all Messages[0-9]+.csv files from a directory ([0-9]+ represents any integer number) 23* Reads the protection values from a gdal compatible raster file 24* Calculates the downstream protection value algorithm serially for windows, and parallel for linux 25* Is hooked up to usage_template/downstream_protection_value.ipynb for graphical exploration 26""" 27import logging 28import re 29import sys 30from pathlib import Path 31 32import networkx as nx 33import numpy as np 34from matplotlib import pyplot as plt 35from networkx import DiGraph 36from numpy import ndarray 37from osgeo import gdal 38 39from . import setup_logger 40from .raster import id2xy as r_id2xy 41from .raster import read_raster 42 43logger = logging.getLogger(__name__) 44 45 46def id2xy(idx, w=40, h=40): 47 """idx: index, w: width, h:height""" 48 return r_id2xy(idx - 1, w, h) 49 50 51# def id2xy(idx, w=40, h=40): 52# """idx: index, w: width, h:height""" 53# idx -= 1 54# return idx % w, idx // w 55 56# def read_raster(filename: str, band: int = 1, data: bool = True, info: bool = True) -> tuple[np.ndarray, dict]: 57 58 59def single_simulation_downstream_protection_value(msgfile="MessagesFile01.csv", pvfile="py.asc"): 60 """load one diGraph count succesors""" 61 msgG, root = digraph_from_messages(msgfile) 62 treeG = shortest_propagation_tree(msgG, root) 63 pv, W, H = get_flat_pv(pvfile) 64 # 65 dpv = np.zeros(pv.shape, dtype=pv.dtype) 66 i2n = [n - 1 for n in treeG] 67 mdpv = dpv_maskG(treeG, root, pv, i2n) 68 dpv[i2n] = mdpv 69 return mdpv, dpv 70 71 72def downstream_protection_value(out_dir, pvfile): 73 pv, W, H = get_flat_pv(pvfile) 74 dpv = np.zeros(pv.shape, dtype=pv.dtype) 75 file_list = read_files(out_dir) 76 for msgfile in file_list: 77 msgG, root = digraph_from_messages(msgfile) 78 treeG = shortest_propagation_tree(msgG, root) 79 i2n = [n - 1 for n in treeG] # TODO change to list(treeG) 80 mdpv = dpv_maskG(treeG, root, pv, i2n) 81 dpv[i2n] += mdpv 82 # plot_pv( dpv, w=W, h=H) 83 return dpv / len(file_list) 84 85 86def canon3(afile): 87 # NO IMPLEMENTADO 88 G = nx.read_edgelist( 89 path=afile, create_using=nx.DiGraph(), nodetype=np.int32, data=[("time", np.int16)], delimiter="," 90 ) 91 return G 92 93 94def canon4(afile): 95 G = nx.read_edgelist( 96 path=afile, 97 create_using=nx.DiGraph(), 98 nodetype=np.int32, 99 data=[("time", np.int16), ("ros", np.float32)], 100 delimiter=",", 101 ) 102 return G 103 104 105def digraph_from_messages(afile): 106 """Not checking if file exists or if size > 0 107 This is done previously on read_files 108 """ 109 data = np.loadtxt( 110 afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("time", np.int16)], usecols=(0, 1, 2) 111 ) 112 root = data[0][0] # checkar que el primer valor del message sea el punto de ignición 113 G = nx.DiGraph() 114 G.add_weighted_edges_from(data) 115 return G, root 116 117 118func = np.vectorize(lambda x, y: {"time": x, "ros": y}) 119 120 121def custom4(afile): 122 data = np.loadtxt( 123 afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("time", np.int16), ("ros", np.float32)] 124 ) 125 root = data[0][0] 126 G = nx.DiGraph() 127 bunch = np.vstack((data["i"], data["j"], func(data["time"], data["ros"]))).T 128 G.add_edges_from(bunch) 129 return G.add_edges_from(bunch), root 130 131 132def shortest_propagation_tree(G, root): 133 """construct a tree with the all shortest path from root 134 TODO try accumulating already added edges for checking before asigning should be faster? 135 """ 136 # { node : [root,...,node], ... } 137 shortest_paths = nx.single_source_dijkstra_path(G, root, weight="time") 138 del shortest_paths[root] 139 T = nx.DiGraph() 140 for node, shopat in shortest_paths.items(): 141 for i, node in enumerate(shopat[:-1]): 142 T.add_edge(node, shopat[i + 1]) 143 return T 144 145 146def recursiveUp(G): 147 """count up WTF!!! 148 leafs = [x for x in T.nodes if T.out_degree(x)==0] 149 """ 150 for i in G.nodes: 151 G.nodes[i]["dv"] = 1 152 153 # G.nodes[i]['dv']=0 154 # for leaf in (x for x in G.nodes if G.out_degree(x)==0): 155 # G.nodes[leaf]['dv']=1 156 def count_up(G, j): 157 for i in G.predecessors(j): 158 # G.nodes[i]['dv']+=G.nodes[j]['dv'] 159 G.nodes[i]["dv"] += 1 160 print(i, j, G.nodes[i]["dv"]) 161 count_up(G, i) 162 163 for leaf in (x for x in G.nodes if G.out_degree(x) == 0): 164 count_up(G, leaf) 165 166 167def dpv_maskG(G, root, pv, i2n=None): 168 """calculate downstream protection value in a flat protection value raster 169 i2n = [n for n in treeG.nodes] 170 1. copies a slice of pv, only Graph nodes 171 2. recursively sums downstream for all succesors of the graph (starting from root) 172 3. returns the slice (range(len(G) indexed) 173 G must be a tree 174 """ 175 if i2n is None: 176 i2n = [n - 1 for n in treeG] 177 # -1 ok? 178 mdpv = pv[i2n] 179 180 # assert mdpv.base is None ,'the slice is not a copy!' 181 def recursion(G, i, pv, mdpv, i2n): 182 for j in G.successors(i): 183 mdpv[i2n.index(i - 1)] += recursion(G, j, pv, mdpv, i2n) 184 return mdpv[i2n.index(i - 1)] 185 186 recursion(G, root, pv, mdpv, i2n) 187 return mdpv 188 189 190def recursion2(G: DiGraph, i: int, mdpv: ndarray, i2n: list[int]) -> ndarray: 191 for j in G.successors(i): 192 mdpv[i2n.index(i)] += recursion2(G, j, mdpv, i2n) 193 return mdpv[i2n.index(i)] 194 195 196def add_dpv_graph(G, root, pv): 197 """modifies the input Graph recursively: 198 1. sums pv into predecesor (or calling) 199 2. recursively sums downstream for all succesors 200 3. returns itself if no successors 201 G must be a tree with 'dv' 202 hence returns nothing 203 """ 204 for n in G.nodes: 205 G.nodes[n]["dv"] += pv[n - 1] 206 207 def recursion(G, i): 208 for j in G.successors(i): 209 G.nodes[i]["dv"] += recursion(G, j) 210 return G.nodes[i]["dv"] 211 212 recursion(G, root) 213 214 215def sum_dpv_graph(T, root, pv): 216 """returns a copy of T that: 217 1. sets pv into each node 218 2. recursively sums pv downstream 219 T must be a tree (not checked) 220 """ 221 G = T.copy() 222 for i in G.nodes: 223 G.nodes[i]["dv"] = pv[i - 1] 224 225 def recursion(G, i): 226 for j in G.successors(i): 227 G.nodes[i]["dv"] += recursion(G, j) 228 return G.nodes[i]["dv"] 229 230 recursion(G, root) 231 return G 232 233 234def count_downstream_graph(T, root) -> nx.DiGraph: 235 """returns a new Graph with node values of the number of conected nodes downstream""" 236 assert nx.is_tree(T), "not tree" 237 G = T.copy() 238 for i in G.nodes: 239 G.nodes[i]["dv"] = 1 240 241 def recursion(G, i): 242 for j in G.successors(i): 243 G.nodes[i]["dv"] += recursion(G, j) 244 return G.nodes[i]["dv"] 245 246 recursion(G, root) 247 return G 248 249 250def glob_int_sorted(directory: Path, filename: str = "MessagesFile.csv"): 251 """reads all MessagesFile<int>.csv >0 size files from directory, regexes numbers casting to int, sorts them 252 Args: 253 directory (Path): directory to read 254 filename (str): filename to match 255 Returns: 256 list: ordered list of pathlib files 257 """ 258 if not directory.is_dir(): 259 raise NotADirectoryError(f"{directory=} is not a directory") 260 file_name = Path(filename) 261 file_list = [f for f in directory.glob(file_name.stem + "[0-9]*" + file_name.suffix) if f.stat().st_size > 0] 262 if len(file_list) == 0: 263 raise FileNotFoundError(f"{directory=} has no {file_name=} matching >0 size files") 264 file_string = " ".join([f.stem for f in file_list]) 265 # sort 266 sim_num = np.fromiter(re.findall("([0-9]+)", file_string), dtype=int, count=len(file_list)) 267 asort = np.argsort(sim_num) 268 sim_num = sim_num[asort] 269 file_list = list(np.array(file_list)[asort]) 270 return file_list 271 272 273def get_flat_pv(afile): 274 """opens the file with gdal as raster, get 1st band, flattens it 275 also returns width and height 276 """ 277 dataset = gdal.Open(str(afile), gdal.GA_ReadOnly) 278 return dataset.GetRasterBand(1).ReadAsArray().ravel(), dataset.RasterXSize, dataset.RasterYSize 279 280 281def plot(G, pos=None, labels=None): 282 """matplotlib 283 TODO scientific format numeric labels 284 """ 285 if not pos: 286 pos = {node: [*id2xy(node)] for node in G.nodes} 287 if not labels: 288 labes = {node: node for node in G.nodes} 289 plt.figure() 290 nx.draw(G, pos=pos, with_labels=False) 291 nx.draw_networkx_labels(G, pos=pos, labels=labels) 292 return plt.show() 293 294 295def plot_pv(pv, w=40, h=40): 296 mat = pv.reshape(h, w) 297 plt.matshow(mat) 298 plt.show() 299 300 301def worker(data, pv, sid): 302 303 # digraph_from_messages(msgfile) -> msgG, root 304 msgG = DiGraph() 305 msgG.add_weighted_edges_from(data) 306 root = data[0][0] 307 # shortest_propagation_tree(G, root) -> treeG 308 shortest_paths = nx.single_source_dijkstra_path(msgG, root, weight="time") 309 del shortest_paths[root] 310 treeG = DiGraph() 311 for node, shopat in shortest_paths.items(): 312 for i, node in enumerate(shopat[:-1]): 313 treeG.add_edge(node, shopat[i + 1]) 314 # dpv_maskG(G, root, pv, i2n) -> mdpv 315 i2n = [n for n in treeG] 316 mdpv = pv[i2n] 317 recursion(treeG, root, mdpv, i2n) 318 # dpv[i2n] += mdpv 319 return mdpv, i2n, sid 320 321 322from re import search 323 324 325def load_msg(afile: Path): 326 try: 327 sim_id = search(r"\d+", afile.stem).group(0) 328 except: 329 sim_id = "-1" 330 data = np.loadtxt( 331 afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)], usecols=(0, 1, 2), ndmin=1 332 ) 333 return data, sim_id 334 335 336def get_data(files, callback=None): 337 data = [] 338 for count, afile in enumerate(files): 339 sim_id = search("\\d+", afile.stem).group(0) 340 data += [ 341 np.loadtxt( 342 afile, 343 delimiter=",", 344 dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)], 345 usecols=(0, 1, 2), 346 ndmin=1, 347 ) 348 ] 349 # 1 based to 0 based 350 data[-1]["i"] -= 1 351 data[-1]["j"] -= 1 352 if callback: 353 callback(count, sim_id, afile) 354 yield data 355 with open("messages.pickle", "wb") as f: 356 pickle_dump(data, f) 357 358 359def one_sim_work(afile, pv, sid): 360 # digraph_from_messages(msgfile) -> msgG, root 361 data = np.loadtxt( 362 afile, 363 delimiter=",", 364 dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)], 365 usecols=(0, 1, 2), 366 ndmin=1, 367 ) 368 # 1 based to 0 based 369 data["i"] -= 1 370 data["j"] -= 1 371 msgG = nx.DiGraph() 372 msgG.add_weighted_edges_from(data) 373 root = data[0][0] 374 # shortest_propagation_tree(G, root) -> treeG 375 shortest_paths = nx.single_source_dijkstra_path(msgG, root, weight="time") 376 del shortest_paths[root] 377 treeG = nx.DiGraph() 378 for node, shopat in shortest_paths.items(): 379 for i, node in enumerate(shopat[:-1]): 380 treeG.add_edge(node, shopat[i + 1]) 381 # dpv_maskG(G, root, pv, i2n) -> mdpv 382 i2n = [n for n in treeG] 383 mdpv = pv[i2n] 384 recursion2(treeG, root, mdpv, i2n) 385 # dpv[i2n] += mdpv 386 print("fin", sid) 387 return mdpv, i2n, sid 388 389 390def argument_parser(argv): 391 """parse arguments 392 - logger: verbosity, logfile 393 """ 394 import argparse 395 396 def path_check(path): 397 path = Path(path) 398 if path.is_dir() or (path.is_file() and path.stat().st_size > 0): 399 return path 400 raise argparse.ArgumentTypeError(f"{path.as_uri()=} is not a valid Path or file size is 0") 401 402 parser = argparse.ArgumentParser( 403 description=__doc__, 404 formatter_class=argparse.RawDescriptionHelpFormatter, 405 ) 406 parser.add_argument( 407 "-v", 408 "--verbosity", 409 help="increase output verbosity (0: warning(default), 1: info, 2>=: debug)", 410 action="count", 411 default=0, 412 ) 413 parser.add_argument("-l", "--logfile", help="rotating log file see fire2a.setup_logger", type=Path) 414 parser.add_argument( 415 "-dpvn", 416 "--dpv-filename", 417 help="dpv raster layer output filename", 418 type=str, 419 default="dpv.tif", 420 ) 421 parser.add_argument( 422 "-pv", 423 "--protection-value", 424 help="pv raster layer, must be gdal.Open compatible, and match messages width & height", 425 type=path_check, 426 ) 427 parser.add_argument( 428 "-pvb", 429 "--protection-value-band", 430 help="pv raster layer band to read, defaults to 1", 431 type=int, 432 default=1, 433 ) 434 parser.add_argument("-W", help="width of the raster (overrides width read from pv raster)", type=int) 435 parser.add_argument("-H", help="height of the raster (overrides width read from pv raster)", type=int) 436 parser.add_argument( 437 nargs="+", 438 dest="messages", 439 help=( 440 "Messages[0-9+].csv file(s) or directory to parse (hint use glob: Messages*.csv to pass many files or pass" 441 " a single directory to read all Messages[0-9+].csv files in it)" 442 ), 443 type=path_check, 444 ) 445 return parser.parse_args(argv) 446 447 448def main(argv=None): 449 """steps to run dpv""" 450 if argv is None: 451 argv = sys.argv[1:] 452 args = argument_parser(argv) 453 logger = setup_logger(__name__, args.verbosity, args.logfile) 454 logger.info(f"{args=}") 455 logger.debug("debugging...") 456 457 file_list = [] 458 for path_str in args.messages: 459 path = Path(path_str) 460 if path.is_file() and path.stat().st_size > 0: 461 file_list += [path] 462 elif path.is_dir(): 463 file_list += glob_int_sorted(path) 464 else: 465 logger.warning(f"{path} is not a file or directory") 466 # remove duplicates 467 file_list = list(dict.fromkeys(file_list)) 468 logger.info(f"{file_list=}") 469 470 if args.protection_value: 471 pv_data, pv_info = read_raster(str(args.protection_value), args.protection_value_band) 472 W, H, GT, PJ = pv_info["RasterXSize"], pv_info["RasterYSize"], pv_info["Transform"], pv_info["Projection"] 473 # make 1D 474 pv = pv_data.ravel() 475 logger.info(f"{pv_data.dtype=}, {pv_data.shape=}, {pv_info=}") 476 dpv = np.zeros(pv.shape, dtype=pv.dtype) 477 elif args.W and args.H: 478 W, H = args.W, args.H 479 GT = (0, 1, 0, 0, 0, 1) 480 PJ = "EPSG:4326" 481 pv = np.ones(W * H, dtype=np.int32) 482 dpv = np.zeros(W * H, dtype=np.int32) 483 484 from multiprocessing import Pool, cpu_count 485 486 with Pool(processes=cpu_count() - 1) as pool: 487 results = [pool.apply_async(one_sim_work, args=(afile, pv, i)) for i, afile in enumerate(file_list)] 488 for result in results: 489 sdpv, si2n, sid = result.get() 490 dpv[si2n] += sdpv 491 # scale 492 dpv = dpv / len(file_list) 493 print(f"{dpv=}") 494 # write 495 dst_ds = gdal.GetDriverByName("GTiff").Create(args.dpv_filename, W, H, 1, gdal.GDT_Float32) 496 # get driver by name to create a geo tiff raster 497 dst_ds.SetGeoTransform(GT) 498 dst_ds.SetProjection(PJ) 499 band = dst_ds.GetRasterBand(1) 500 band.SetUnitType("protection_value") 501 if 0 != band.SetNoDataValue(0): 502 feedback.pushWarning(f"Set No Data failed for {self.OUT_R}") 503 if 0 != band.WriteArray(np.float32(dpv.reshape(H, W))): 504 feedback.pushWarning(f"WriteArray failed for {self.OUT_R}") 505 band.FlushCache() 506 dst_ds.FlushCache() 507 dst_ds = None 508 return 0 509 510 511if __name__ == "__main__": 512 sys.exit(main()) 513 514 515def no(): 516 # if len(sys.argv)>1: 517 # input_dir = sys.argv[1] 518 # output_dir = sys.argv[2] 519 # else: 520 print("run in C2FSB folder") 521 # input_dir = Path.cwd() / 'data' 522 input_dir = Path("/home/fdo/source/C2F-W3/data/Vilopriu_2013/firesim_231008_110004") 523 # output_dir = Path.cwd() / 'results' 524 output_dir = Path("/home/fdo/source/C2F-W3/data/Vilopriu_2013/firesim_231008_110004/results") 525 print(input_dir, output_dir) 526 assert input_dir.is_dir() and output_dir.is_dir() 527 528 # abro el directorio de los messages como una lista con los nombres de los archivos 529 file_list = read_files(output_dir) 530 531 # agarrar la capa que ocuparemos como valor a proteger 532 ## pv: valores en riesgo 533 ## W: Width 534 ## H: Height 535 pv, W, H = get_flat_pv(input_dir / "fuels.asc") 536 537 # 538 # single simulation 539 # 540 afile = file_list[0] 541 msgG, root = digraph_from_messages(afile) 542 pos = {node: [*id2xy(node)] for node in msgG} 543 treeG = shortest_propagation_tree(msgG, root) 544 545 # count the number of nodes downstream 546 countG = count_downstream_graph(treeG, root) 547 548 # asignar el número de nodos aguas abajo a cada nodo respectivamente 549 countGv = {n: countG.nodes[n]["dv"] for n in countG} 550 plot(countG, pos=pos, labels=countGv) 551 # {'dv': 137} == 137 root connects all tree 552 assert countG.nodes[root]["dv"] == len(countG), "count_downstream at root is not the same as number of nodes!" 553 # 554 onev = np.ones(pv.shape) 555 # 556 # sum dpv=1 557 sumG = sum_dpv_graph(treeG, root, onev) 558 sumGv = {n: sumG.nodes[n]["dv"] for n in sumG} 559 plot(sumG, pos=pos, labels=sumGv) 560 assert np.all([sumGv[n] == countGv[n] for n in treeG.nodes]), "sum_dpv(pv=1) != countG values!" 561 # 562 # add dpv=1 563 addG = treeG.copy() 564 for n in addG.nodes: 565 addG.nodes[n]["dv"] = 0 566 add_dpv_graph(addG, root, onev) 567 addGv = {n: addG.nodes[n]["dv"] for n in addG} 568 plot(addG, pos=pos, labels=addGv) 569 assert np.all([addGv[n] == countGv[n] for n in treeG.nodes]), "add_dpv(pv=1) != countG values!" 570 # 571 # cum dpv=1 572 dpv = np.zeros(pv.shape, dtype=pv.dtype) 573 i2n = [n - 1 for n in treeG] 574 mdpv = dpv_maskG(treeG, root, onev, i2n) 575 dpv[i2n] = mdpv 576 plot_pv(dpv, w=W, h=H) 577 assert np.all([mdpv[i2n.index(n - 1)] == countGv[n] for n in treeG.nodes]), "dpv_maskG(pv=1) != countG values!" 578 # 579 # single full test 580 mdpv, dpv = single_simulation_downstream_protection_value(msgfile=afile, pvfile=input_dir / "bp.asc") 581 plot_pv(dpv, w=W, h=H) 582 plot(treeG, pos=pos, labels={n: np.format_float_scientific(dpv[n], precision=2) for n in treeG}) 583 assert np.all(np.isclose(mdpv, dpv[i2n])), "dpv_maskG != dpvalues!" 584 # 585 # finally 586 dpv = downstream_protection_value(output_dir, pvfile=input_dir / "bp.asc") 587 plot_pv(dpv, w=W, h=H) 588 589 590""" 591$cd C2FSB 592C2FSB$ python3 downstream_protection_value.py 593 594all functions are tested and plotted on main 595 596Calculate downstream protection value from Messages/MessagesFile<int>.csv files 597Each file has 4 columns: from cellId, to cellId, period when burns & hit ROS 598 599https://github.com/fire2a/C2FK/blob/main/Cell2Fire/Heuristics.py 600 601https://networkx.org/documentation/networkx-1.8/reference/algorithms.shortest_paths.html 602 603propagation tree: (c) fire shortest traveling times 604 605Performance review 6061. is faster to dijkstra than minimun spanning 607 608 In [50]: %timeit shortest_propagation_tree(G,root) 609 1.53 ms ± 5.47 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) 610 611 In [51]: %timeit nx.minimum_spanning_arborescence(G, attr='time') 612 16.4 ms ± 61 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 613 6142. is faster numpy+add_edges than nx.from_csv 615 616 In [63]: %timeit custom4(afile) 617 2.3 ms ± 32 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 618 619 In [64]: %timeit canon4(afile) 620 3.35 ms ± 20 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 621 6222.1 even faster is you discard a column!! 623 In [65]: %timeit digraph_from_messages(afile) 624 1.84 ms ± 15.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) 625 626"""
47def id2xy(idx, w=40, h=40): 48 """idx: index, w: width, h:height""" 49 return r_id2xy(idx - 1, w, h)
idx: index, w: width, h:height
60def single_simulation_downstream_protection_value(msgfile="MessagesFile01.csv", pvfile="py.asc"): 61 """load one diGraph count succesors""" 62 msgG, root = digraph_from_messages(msgfile) 63 treeG = shortest_propagation_tree(msgG, root) 64 pv, W, H = get_flat_pv(pvfile) 65 # 66 dpv = np.zeros(pv.shape, dtype=pv.dtype) 67 i2n = [n - 1 for n in treeG] 68 mdpv = dpv_maskG(treeG, root, pv, i2n) 69 dpv[i2n] = mdpv 70 return mdpv, dpv
load one diGraph count succesors
73def downstream_protection_value(out_dir, pvfile): 74 pv, W, H = get_flat_pv(pvfile) 75 dpv = np.zeros(pv.shape, dtype=pv.dtype) 76 file_list = read_files(out_dir) 77 for msgfile in file_list: 78 msgG, root = digraph_from_messages(msgfile) 79 treeG = shortest_propagation_tree(msgG, root) 80 i2n = [n - 1 for n in treeG] # TODO change to list(treeG) 81 mdpv = dpv_maskG(treeG, root, pv, i2n) 82 dpv[i2n] += mdpv 83 # plot_pv( dpv, w=W, h=H) 84 return dpv / len(file_list)
106def digraph_from_messages(afile): 107 """Not checking if file exists or if size > 0 108 This is done previously on read_files 109 """ 110 data = np.loadtxt( 111 afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("time", np.int16)], usecols=(0, 1, 2) 112 ) 113 root = data[0][0] # checkar que el primer valor del message sea el punto de ignición 114 G = nx.DiGraph() 115 G.add_weighted_edges_from(data) 116 return G, root
Not checking if file exists or if size > 0 This is done previously on read_files
122def custom4(afile): 123 data = np.loadtxt( 124 afile, delimiter=",", dtype=[("i", np.int32), ("j", np.int32), ("time", np.int16), ("ros", np.float32)] 125 ) 126 root = data[0][0] 127 G = nx.DiGraph() 128 bunch = np.vstack((data["i"], data["j"], func(data["time"], data["ros"]))).T 129 G.add_edges_from(bunch) 130 return G.add_edges_from(bunch), root
133def shortest_propagation_tree(G, root): 134 """construct a tree with the all shortest path from root 135 TODO try accumulating already added edges for checking before asigning should be faster? 136 """ 137 # { node : [root,...,node], ... } 138 shortest_paths = nx.single_source_dijkstra_path(G, root, weight="time") 139 del shortest_paths[root] 140 T = nx.DiGraph() 141 for node, shopat in shortest_paths.items(): 142 for i, node in enumerate(shopat[:-1]): 143 T.add_edge(node, shopat[i + 1]) 144 return T
construct a tree with the all shortest path from root TODO try accumulating already added edges for checking before asigning should be faster?
147def recursiveUp(G): 148 """count up WTF!!! 149 leafs = [x for x in T.nodes if T.out_degree(x)==0] 150 """ 151 for i in G.nodes: 152 G.nodes[i]["dv"] = 1 153 154 # G.nodes[i]['dv']=0 155 # for leaf in (x for x in G.nodes if G.out_degree(x)==0): 156 # G.nodes[leaf]['dv']=1 157 def count_up(G, j): 158 for i in G.predecessors(j): 159 # G.nodes[i]['dv']+=G.nodes[j]['dv'] 160 G.nodes[i]["dv"] += 1 161 print(i, j, G.nodes[i]["dv"]) 162 count_up(G, i) 163 164 for leaf in (x for x in G.nodes if G.out_degree(x) == 0): 165 count_up(G, leaf)
count up WTF!!! leafs = [x for x in T.nodes if T.out_degree(x)==0]
168def dpv_maskG(G, root, pv, i2n=None): 169 """calculate downstream protection value in a flat protection value raster 170 i2n = [n for n in treeG.nodes] 171 1. copies a slice of pv, only Graph nodes 172 2. recursively sums downstream for all succesors of the graph (starting from root) 173 3. returns the slice (range(len(G) indexed) 174 G must be a tree 175 """ 176 if i2n is None: 177 i2n = [n - 1 for n in treeG] 178 # -1 ok? 179 mdpv = pv[i2n] 180 181 # assert mdpv.base is None ,'the slice is not a copy!' 182 def recursion(G, i, pv, mdpv, i2n): 183 for j in G.successors(i): 184 mdpv[i2n.index(i - 1)] += recursion(G, j, pv, mdpv, i2n) 185 return mdpv[i2n.index(i - 1)] 186 187 recursion(G, root, pv, mdpv, i2n) 188 return mdpv
calculate downstream protection value in a flat protection value raster i2n = [n for n in treeG.nodes]
- copies a slice of pv, only Graph nodes
- recursively sums downstream for all succesors of the graph (starting from root)
- returns the slice (range(len(G) indexed) G must be a tree
197def add_dpv_graph(G, root, pv): 198 """modifies the input Graph recursively: 199 1. sums pv into predecesor (or calling) 200 2. recursively sums downstream for all succesors 201 3. returns itself if no successors 202 G must be a tree with 'dv' 203 hence returns nothing 204 """ 205 for n in G.nodes: 206 G.nodes[n]["dv"] += pv[n - 1] 207 208 def recursion(G, i): 209 for j in G.successors(i): 210 G.nodes[i]["dv"] += recursion(G, j) 211 return G.nodes[i]["dv"] 212 213 recursion(G, root)
modifies the input Graph recursively: 1. sums pv into predecesor (or calling) 2. recursively sums downstream for all succesors 3. returns itself if no successors G must be a tree with 'dv' hence returns nothing
216def sum_dpv_graph(T, root, pv): 217 """returns a copy of T that: 218 1. sets pv into each node 219 2. recursively sums pv downstream 220 T must be a tree (not checked) 221 """ 222 G = T.copy() 223 for i in G.nodes: 224 G.nodes[i]["dv"] = pv[i - 1] 225 226 def recursion(G, i): 227 for j in G.successors(i): 228 G.nodes[i]["dv"] += recursion(G, j) 229 return G.nodes[i]["dv"] 230 231 recursion(G, root) 232 return G
returns a copy of T that: 1. sets pv into each node 2. recursively sums pv downstream T must be a tree (not checked)
235def count_downstream_graph(T, root) -> nx.DiGraph: 236 """returns a new Graph with node values of the number of conected nodes downstream""" 237 assert nx.is_tree(T), "not tree" 238 G = T.copy() 239 for i in G.nodes: 240 G.nodes[i]["dv"] = 1 241 242 def recursion(G, i): 243 for j in G.successors(i): 244 G.nodes[i]["dv"] += recursion(G, j) 245 return G.nodes[i]["dv"] 246 247 recursion(G, root) 248 return G
returns a new Graph with node values of the number of conected nodes downstream
251def glob_int_sorted(directory: Path, filename: str = "MessagesFile.csv"): 252 """reads all MessagesFile<int>.csv >0 size files from directory, regexes numbers casting to int, sorts them 253 Args: 254 directory (Path): directory to read 255 filename (str): filename to match 256 Returns: 257 list: ordered list of pathlib files 258 """ 259 if not directory.is_dir(): 260 raise NotADirectoryError(f"{directory=} is not a directory") 261 file_name = Path(filename) 262 file_list = [f for f in directory.glob(file_name.stem + "[0-9]*" + file_name.suffix) if f.stat().st_size > 0] 263 if len(file_list) == 0: 264 raise FileNotFoundError(f"{directory=} has no {file_name=} matching >0 size files") 265 file_string = " ".join([f.stem for f in file_list]) 266 # sort 267 sim_num = np.fromiter(re.findall("([0-9]+)", file_string), dtype=int, count=len(file_list)) 268 asort = np.argsort(sim_num) 269 sim_num = sim_num[asort] 270 file_list = list(np.array(file_list)[asort]) 271 return file_list
reads all MessagesFile
274def get_flat_pv(afile): 275 """opens the file with gdal as raster, get 1st band, flattens it 276 also returns width and height 277 """ 278 dataset = gdal.Open(str(afile), gdal.GA_ReadOnly) 279 return dataset.GetRasterBand(1).ReadAsArray().ravel(), dataset.RasterXSize, dataset.RasterYSize
opens the file with gdal as raster, get 1st band, flattens it also returns width and height
282def plot(G, pos=None, labels=None): 283 """matplotlib 284 TODO scientific format numeric labels 285 """ 286 if not pos: 287 pos = {node: [*id2xy(node)] for node in G.nodes} 288 if not labels: 289 labes = {node: node for node in G.nodes} 290 plt.figure() 291 nx.draw(G, pos=pos, with_labels=False) 292 nx.draw_networkx_labels(G, pos=pos, labels=labels) 293 return plt.show()
matplotlib TODO scientific format numeric labels
302def worker(data, pv, sid): 303 304 # digraph_from_messages(msgfile) -> msgG, root 305 msgG = DiGraph() 306 msgG.add_weighted_edges_from(data) 307 root = data[0][0] 308 # shortest_propagation_tree(G, root) -> treeG 309 shortest_paths = nx.single_source_dijkstra_path(msgG, root, weight="time") 310 del shortest_paths[root] 311 treeG = DiGraph() 312 for node, shopat in shortest_paths.items(): 313 for i, node in enumerate(shopat[:-1]): 314 treeG.add_edge(node, shopat[i + 1]) 315 # dpv_maskG(G, root, pv, i2n) -> mdpv 316 i2n = [n for n in treeG] 317 mdpv = pv[i2n] 318 recursion(treeG, root, mdpv, i2n) 319 # dpv[i2n] += mdpv 320 return mdpv, i2n, sid
337def get_data(files, callback=None): 338 data = [] 339 for count, afile in enumerate(files): 340 sim_id = search("\\d+", afile.stem).group(0) 341 data += [ 342 np.loadtxt( 343 afile, 344 delimiter=",", 345 dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)], 346 usecols=(0, 1, 2), 347 ndmin=1, 348 ) 349 ] 350 # 1 based to 0 based 351 data[-1]["i"] -= 1 352 data[-1]["j"] -= 1 353 if callback: 354 callback(count, sim_id, afile) 355 yield data 356 with open("messages.pickle", "wb") as f: 357 pickle_dump(data, f)
360def one_sim_work(afile, pv, sid): 361 # digraph_from_messages(msgfile) -> msgG, root 362 data = np.loadtxt( 363 afile, 364 delimiter=",", 365 dtype=[("i", np.int32), ("j", np.int32), ("t", np.int32)], 366 usecols=(0, 1, 2), 367 ndmin=1, 368 ) 369 # 1 based to 0 based 370 data["i"] -= 1 371 data["j"] -= 1 372 msgG = nx.DiGraph() 373 msgG.add_weighted_edges_from(data) 374 root = data[0][0] 375 # shortest_propagation_tree(G, root) -> treeG 376 shortest_paths = nx.single_source_dijkstra_path(msgG, root, weight="time") 377 del shortest_paths[root] 378 treeG = nx.DiGraph() 379 for node, shopat in shortest_paths.items(): 380 for i, node in enumerate(shopat[:-1]): 381 treeG.add_edge(node, shopat[i + 1]) 382 # dpv_maskG(G, root, pv, i2n) -> mdpv 383 i2n = [n for n in treeG] 384 mdpv = pv[i2n] 385 recursion2(treeG, root, mdpv, i2n) 386 # dpv[i2n] += mdpv 387 print("fin", sid) 388 return mdpv, i2n, sid
391def argument_parser(argv): 392 """parse arguments 393 - logger: verbosity, logfile 394 """ 395 import argparse 396 397 def path_check(path): 398 path = Path(path) 399 if path.is_dir() or (path.is_file() and path.stat().st_size > 0): 400 return path 401 raise argparse.ArgumentTypeError(f"{path.as_uri()=} is not a valid Path or file size is 0") 402 403 parser = argparse.ArgumentParser( 404 description=__doc__, 405 formatter_class=argparse.RawDescriptionHelpFormatter, 406 ) 407 parser.add_argument( 408 "-v", 409 "--verbosity", 410 help="increase output verbosity (0: warning(default), 1: info, 2>=: debug)", 411 action="count", 412 default=0, 413 ) 414 parser.add_argument("-l", "--logfile", help="rotating log file see fire2a.setup_logger", type=Path) 415 parser.add_argument( 416 "-dpvn", 417 "--dpv-filename", 418 help="dpv raster layer output filename", 419 type=str, 420 default="dpv.tif", 421 ) 422 parser.add_argument( 423 "-pv", 424 "--protection-value", 425 help="pv raster layer, must be gdal.Open compatible, and match messages width & height", 426 type=path_check, 427 ) 428 parser.add_argument( 429 "-pvb", 430 "--protection-value-band", 431 help="pv raster layer band to read, defaults to 1", 432 type=int, 433 default=1, 434 ) 435 parser.add_argument("-W", help="width of the raster (overrides width read from pv raster)", type=int) 436 parser.add_argument("-H", help="height of the raster (overrides width read from pv raster)", type=int) 437 parser.add_argument( 438 nargs="+", 439 dest="messages", 440 help=( 441 "Messages[0-9+].csv file(s) or directory to parse (hint use glob: Messages*.csv to pass many files or pass" 442 " a single directory to read all Messages[0-9+].csv files in it)" 443 ), 444 type=path_check, 445 ) 446 return parser.parse_args(argv)
parse arguments
- logger: verbosity, logfile
449def main(argv=None): 450 """steps to run dpv""" 451 if argv is None: 452 argv = sys.argv[1:] 453 args = argument_parser(argv) 454 logger = setup_logger(__name__, args.verbosity, args.logfile) 455 logger.info(f"{args=}") 456 logger.debug("debugging...") 457 458 file_list = [] 459 for path_str in args.messages: 460 path = Path(path_str) 461 if path.is_file() and path.stat().st_size > 0: 462 file_list += [path] 463 elif path.is_dir(): 464 file_list += glob_int_sorted(path) 465 else: 466 logger.warning(f"{path} is not a file or directory") 467 # remove duplicates 468 file_list = list(dict.fromkeys(file_list)) 469 logger.info(f"{file_list=}") 470 471 if args.protection_value: 472 pv_data, pv_info = read_raster(str(args.protection_value), args.protection_value_band) 473 W, H, GT, PJ = pv_info["RasterXSize"], pv_info["RasterYSize"], pv_info["Transform"], pv_info["Projection"] 474 # make 1D 475 pv = pv_data.ravel() 476 logger.info(f"{pv_data.dtype=}, {pv_data.shape=}, {pv_info=}") 477 dpv = np.zeros(pv.shape, dtype=pv.dtype) 478 elif args.W and args.H: 479 W, H = args.W, args.H 480 GT = (0, 1, 0, 0, 0, 1) 481 PJ = "EPSG:4326" 482 pv = np.ones(W * H, dtype=np.int32) 483 dpv = np.zeros(W * H, dtype=np.int32) 484 485 from multiprocessing import Pool, cpu_count 486 487 with Pool(processes=cpu_count() - 1) as pool: 488 results = [pool.apply_async(one_sim_work, args=(afile, pv, i)) for i, afile in enumerate(file_list)] 489 for result in results: 490 sdpv, si2n, sid = result.get() 491 dpv[si2n] += sdpv 492 # scale 493 dpv = dpv / len(file_list) 494 print(f"{dpv=}") 495 # write 496 dst_ds = gdal.GetDriverByName("GTiff").Create(args.dpv_filename, W, H, 1, gdal.GDT_Float32) 497 # get driver by name to create a geo tiff raster 498 dst_ds.SetGeoTransform(GT) 499 dst_ds.SetProjection(PJ) 500 band = dst_ds.GetRasterBand(1) 501 band.SetUnitType("protection_value") 502 if 0 != band.SetNoDataValue(0): 503 feedback.pushWarning(f"Set No Data failed for {self.OUT_R}") 504 if 0 != band.WriteArray(np.float32(dpv.reshape(H, W))): 505 feedback.pushWarning(f"WriteArray failed for {self.OUT_R}") 506 band.FlushCache() 507 dst_ds.FlushCache() 508 dst_ds = None 509 return 0
steps to run dpv
516def no(): 517 # if len(sys.argv)>1: 518 # input_dir = sys.argv[1] 519 # output_dir = sys.argv[2] 520 # else: 521 print("run in C2FSB folder") 522 # input_dir = Path.cwd() / 'data' 523 input_dir = Path("/home/fdo/source/C2F-W3/data/Vilopriu_2013/firesim_231008_110004") 524 # output_dir = Path.cwd() / 'results' 525 output_dir = Path("/home/fdo/source/C2F-W3/data/Vilopriu_2013/firesim_231008_110004/results") 526 print(input_dir, output_dir) 527 assert input_dir.is_dir() and output_dir.is_dir() 528 529 # abro el directorio de los messages como una lista con los nombres de los archivos 530 file_list = read_files(output_dir) 531 532 # agarrar la capa que ocuparemos como valor a proteger 533 ## pv: valores en riesgo 534 ## W: Width 535 ## H: Height 536 pv, W, H = get_flat_pv(input_dir / "fuels.asc") 537 538 # 539 # single simulation 540 # 541 afile = file_list[0] 542 msgG, root = digraph_from_messages(afile) 543 pos = {node: [*id2xy(node)] for node in msgG} 544 treeG = shortest_propagation_tree(msgG, root) 545 546 # count the number of nodes downstream 547 countG = count_downstream_graph(treeG, root) 548 549 # asignar el número de nodos aguas abajo a cada nodo respectivamente 550 countGv = {n: countG.nodes[n]["dv"] for n in countG} 551 plot(countG, pos=pos, labels=countGv) 552 # {'dv': 137} == 137 root connects all tree 553 assert countG.nodes[root]["dv"] == len(countG), "count_downstream at root is not the same as number of nodes!" 554 # 555 onev = np.ones(pv.shape) 556 # 557 # sum dpv=1 558 sumG = sum_dpv_graph(treeG, root, onev) 559 sumGv = {n: sumG.nodes[n]["dv"] for n in sumG} 560 plot(sumG, pos=pos, labels=sumGv) 561 assert np.all([sumGv[n] == countGv[n] for n in treeG.nodes]), "sum_dpv(pv=1) != countG values!" 562 # 563 # add dpv=1 564 addG = treeG.copy() 565 for n in addG.nodes: 566 addG.nodes[n]["dv"] = 0 567 add_dpv_graph(addG, root, onev) 568 addGv = {n: addG.nodes[n]["dv"] for n in addG} 569 plot(addG, pos=pos, labels=addGv) 570 assert np.all([addGv[n] == countGv[n] for n in treeG.nodes]), "add_dpv(pv=1) != countG values!" 571 # 572 # cum dpv=1 573 dpv = np.zeros(pv.shape, dtype=pv.dtype) 574 i2n = [n - 1 for n in treeG] 575 mdpv = dpv_maskG(treeG, root, onev, i2n) 576 dpv[i2n] = mdpv 577 plot_pv(dpv, w=W, h=H) 578 assert np.all([mdpv[i2n.index(n - 1)] == countGv[n] for n in treeG.nodes]), "dpv_maskG(pv=1) != countG values!" 579 # 580 # single full test 581 mdpv, dpv = single_simulation_downstream_protection_value(msgfile=afile, pvfile=input_dir / "bp.asc") 582 plot_pv(dpv, w=W, h=H) 583 plot(treeG, pos=pos, labels={n: np.format_float_scientific(dpv[n], precision=2) for n in treeG}) 584 assert np.all(np.isclose(mdpv, dpv[i2n])), "dpv_maskG != dpvalues!" 585 # 586 # finally 587 dpv = downstream_protection_value(output_dir, pvfile=input_dir / "bp.asc") 588 plot_pv(dpv, w=W, h=H)