Modules

The source code is in a folder named app. There are only two source files in the folder, one (main.py) for the FastAPI application and one (optipass.py) that provides an abstract interface for running OptiPass.

app
├── main.py
└── optipass.py

main.py

init

Define global variables used in the rest of the application: paths to static data and names of static data files, a list of names of projects, a dictionary of region names for each project.

Source code in app/main.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def init():
    '''
    Define global variables used in the rest of the application:
    paths to static data and names of static data files, a list of 
    names of projects, a dictionary of region names for each project.
    '''

    global BARRIERS, BARRIER_FILE
    global MAPS, MAPINFO_FILE
    global TARGETS, TARGET_FILE, LAYOUT_FILE
    global COLNAMES, COLNAME_FILE
    global HTMLDIR, IMAGEDIR

    MAPS = 'static/maps'
    MAPINFO_FILE = 'mapinfo.json'

    BARRIERS = 'static/barriers'
    BARRIER_FILE = 'barriers.csv'

    TARGETS = 'static/targets'
    TARGET_FILE = 'targets.csv'
    LAYOUT_FILE = 'layout.txt'

    COLNAMES = 'static/colnames'
    COLNAME_FILE = 'colnames.csv'

    HTMLDIR = 'static/html'
    # IMAGEDIR = 'static/images'

    global project_names, region_names

    logging.basicConfig(
        level=logging.INFO,
        style='{',
        format='{message}',
        handlers = [RichHandler(markup=True, rich_tracebacks=True)],
    )

    project_names = [p.stem for p in Path(BARRIERS).iterdir()]
    logging.info(f'projects: {project_names}')

    region_names = { }
    for project in project_names:
        barrier_file = Path(BARRIERS) / project / BARRIER_FILE
        with open(barrier_file) as f:
            f.readline()     # skip the header
            region_names[project] = { rec.split(',')[1] for rec in f }
    logging.info(f'regions: {region_names}')

read_text_file

Read a text file from one of the static subdirectories.

Parameters:
  • project (str) –

    the project name

  • area (str) –

    the data area (barriers, targets, colnames)

  • fn (str) –

    the name of the file within the data area

Returns:
  • str

    the contents of the file, as a single string

Source code in app/main.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def read_text_file(project: str, area: str, fn: str) -> str:
    '''
    Read a text file from one of the static subdirectories.

    Args:
        project:  the project name
        area:  the data area (barriers, targets, colnames)
        fn:  the name of the file within the data area

    Returns:
        the contents of the file, as a single string
    '''
    p = Path(area) / project / fn
    if not p.exists():
        raise FileNotFoundError(p)
    logging.info(f'reading text file: {p}')

    with open(p) as f:
        return f.read().rstrip()

projects

Respond to GET requests of the form /projects.

Returns:
  • list[str]

    a list of the names of the projects (datasets) managed by the server.

Source code in app/main.py
101
102
103
104
105
106
107
108
109
@app.get("/projects")
async def projects() -> list[str]:
    '''
    Respond to GET requests of the form `/projects`.

    Returns:
        a list of the names of the projects (datasets) managed by the server.
    '''
    return project_names

barriers

Respond to GET requests of the form /barriers/P where P is a project name.

Returns:
  • dict

    the barrier data file for a project, as one long string.

Source code in app/main.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@app.get("/barriers/{project}")
async def barriers(project: str) -> dict:
    '''
    Respond to GET requests of the form `/barriers/P` where P is a project name.

    Returns:
        the barrier data file for a project, as one long string.
    '''
    if project not in project_names:
        raise HTTPException(status_code=404, detail=f'barriers: unknown project: {project}')
    try:
        barriers = read_text_file(project, BARRIERS, BARRIER_FILE)
        return {'project': project, 'barriers': barriers}
    except FileNotFoundError:
        raise HTTPException(status_code=404, detail=f'file not found: {BARRIERS}/{BARRIER_FILE}')
    except Exception as err:
        raise HTTPException(status_code=500, detail=f'server error: {err}')

mapinfo

Respond to GET requests of the form /mapinfo/P where P is a project name.

Returns:
  • dict

    a dictionary (JSON format) with settings for displaying the map for a project.

Source code in app/main.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@app.get("/mapinfo/{project}")
async def mapinfo(project: str) -> dict:
    '''
    Respond to GET requests of the form `/mapinfo/P` where P is a project name.

    Returns:
        a dictionary (JSON format) with settings for displaying the map for a project.
    '''
    if project not in project_names:
        raise HTTPException(status_code=404, detail=f'mapinfo: unknown project: {project}')
    try:
        info = read_text_file(project, MAPS, MAPINFO_FILE)
        return {'project': project, 'mapinfo': info}
    except FileNotFoundError:
        raise HTTPException(status_code=404, detail=f'file not found: {MAPS}/{MAPINFO_FILE}')
    except Exception as err:
        raise HTTPException(status_code=500, detail=f'server error: {err}')

targets

Respond to GET requests of the form /targets/P where P is a project name.

Returns:
  • dict

    the CSV file containing restoration target descriptions for a project

  • dict

    and a plain text file containing the layout in the GUI

Source code in app/main.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@app.get("/targets/{project}")
async def targets(project: str) -> dict:
    '''
    Respond to GET requests of the form `/targets/P` where P is a project name.

    Returns:
        the CSV file containing restoration target descriptions for a project
        and a plain text file containing the layout in the GUI
    '''
    if project not in project_names:
        raise HTTPException(status_code=404, detail=f'targets: unknown project: {project}')
    try:
        targets = read_text_file(project, TARGETS, TARGET_FILE)
        layout = read_text_file(project, TARGETS, LAYOUT_FILE)
        return {'project': project, 'targets': targets, 'layout': layout}
    except FileNotFoundError as err:
        raise HTTPException(status_code=404, detail=str(err))
    except Exception as err:
        raise HTTPException(status_code=500, detail=f'server error: {err}')

colnames

Respond to GET requests of the form /colnames/P where P is a project name.

Returns:
  • dict

    a dictionary with two entries, the name of the mapping and the names of the colname files

Source code in app/main.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
@app.get("/colnames/{project}")
async def colnames(project: str) -> dict:
    '''
    Respond to GET requests of the form `/colnames/P` where P is a project name.

    Returns:
        a dictionary with two entries, the name of the mapping and the names of the colname files
    '''
    try:
        assert project in project_names, f'unknown project: {project}'
        cname_dir = Path(COLNAMES) / project
        cname_file = cname_dir / COLNAME_FILE
        if cname_file.is_file():
            return { 'name': None, 'files': [COLNAME_FILE]}
        elif cname_dir.is_dir():
            alts = list(cname_dir.iterdir())
            assert len(alts) == 1, f'colnames/{project} should have exactly one folder'
            alt_name = alts[0]
            assert alt_name.is_dir(), f'no directory for {alt_name}'
            cnames = [p.stem for p in alt_name.iterdir() if p.suffix == '.csv']
            return { 'name': alt_name.name, 'files': cnames }
        else:
            assert False, f'file not found: {cname_file}'
    except Exception as err:
        raise HTTPException(status_code=404, detail=f'colnames: {err}')

optipass

A GET request of the form /optipass/project?ARGS runs OptiPass using the parameter values passed in the URL.

Parameters:
  • project (str) –

    the name of the project (used to make path to static files)

  • regions (Annotated[list[str], Query()]) –

    comma-separated string of region names

  • targets (Annotated[list[str], Query()]) –

    comma-separated string of 2-letter target IDs

  • budgets (Annotated[list[int], Query()]) –

    a list with starting budget, increment, and count

  • weights (Annotated[list[int] | None, Query()], default: None ) –

    list of ints, one for each target (optional)

  • mapping (Annotated[list[str] | None, Query()], default: None ) –

    project-specific target names, e.g. current or future (optional)

  • tempdir (Annotated[str | None, Query()], default: None ) –

    directory that has existing results (optional, used in testing)

Returns:
  • dict

    a dictionary with a status indicator and a token that can be used to fetch results.

Source code in app/main.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
@app.get("/optipass/{project}")
async def optipass(
    project: str, 
    regions: Annotated[list[str], Query()], 
    budgets: Annotated[list[int], Query()],
    targets: Annotated[list[str], Query()], 
    weights: Annotated[list[int] | None, Query()] = None, 
    mapping: Annotated[list[str] | None, Query()] = None,
    tempdir: Annotated[str | None, Query()] = None,
)-> dict:
    '''
    A GET request of the form `/optipass/project?ARGS` runs OptiPass using the parameter 
    values passed in the URL.

    Args:
        project:  the name of the project (used to make path to static files)
        regions:  comma-separated string of region names
        targets:  comma-separated string of 2-letter target IDs
        budgets:  a list with starting budget, increment, and count
        weights:  list of ints, one for each target (optional)
        mapping:  project-specific target names, e.g. `current` or `future` (optional)
        tempdir:  directory that has existing results (optional, used in testing)

    Returns:
        a dictionary with a status indicator and a token that can be used to fetch results.
    '''
    logging.debug(f'project {project}')
    logging.debug(f'regions {regions}')
    logging.debug(f'budgets {budgets}')
    logging.debug(f'targets {targets}')
    logging.debug(f'weights {weights}')
    logging.debug(f'mapping {mapping}')
    logging.debug(f'tempdir {tempdir}')

    try:
        assert project in project_names, f'unknown project: {project}'

        barrier_path = Path(BARRIERS) / project
        target_file = Path(TARGETS) / project / TARGET_FILE

        cname_dir = Path(COLNAMES) / project
        if mapping is None:
            cname_file = cname_dir / COLNAME_FILE
        else:
            cname_file = cname_dir / mapping[0] / f'{mapping[1]}.csv'

        summary, matrix = run_optipass(
            barrier_path, 
            target_file,
            cname_file,
            regions,
            budgets,
            targets, 
            weights,
            tempdir,
        )

        return {
            'summary': summary.to_csv(),
            'matrix': matrix.to_csv(),
        }

    except AssertionError as err:
        raise HTTPException(status_code=404, detail=f'optipass: {err}')
    except NotImplementedError:
        raise HTTPException(status_code=501, detail=f'OptiPassMain.exe not found')
    except RuntimeError as err:
        raise HTTPException(status_code=500, detail=str(err))
    except Exception as err:
        logging.exception(err)
        raise HTTPException(status_code=500, detail=f'server error: {err}')

optipass.py

optipass_is_installed

Make sure OptiPass is installed.

Returns:
  • bool

    True if OptiPass is installed and this host can run it.

Source code in app/optipass.py
17
18
19
20
21
22
23
24
def optipass_is_installed() -> bool:
    '''
    Make sure OptiPass is installed.

    Returns:
       True if OptiPass is installed and this host can run it.
    '''
    return Path('./bin/OptiPassMain.exe') and ((platform.system() == 'Windows') or os.environ.get('WINEARCH'))

run_optipass

Run OptiPass using the specified arguments. Instantiates an OP object with paths to data files, calls methods that create the input file, run the optimizer, and gather the results.

Parameters:
  • barrier_path (str) –

    name of directory with CSVs files for tide gate data

  • target_file (str) –

    name of a CSV file with restoration target descriptions

  • mapping_file (str) –

    name of CSV file with barrier passabilities

  • regions (list[str]) –

    a list of geographic regions (river names) to use

  • budgets (list[int]) –

    a list with starting budget, budget increment, and number of budgets

  • targets (list[str]) –

    a list of IDs of targets to use

  • weights (list[int]) –

    a list of target weights

  • tmpdir (Path | None, default: None ) –

    name of directory that has existing results (used for testing)

Returns:
  • tuple

    a tuple containing two data frames, a budget table and a gate matrix

Source code in app/optipass.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def run_optipass(
        barrier_path: str, 
        target_file: str,
        mapping_file: str, 
        regions: list[str],
        budgets: list[int],
        targets: list[str], 
        weights: list[int],
        tmpdir: Path | None = None,
    ) -> tuple:
    '''
    Run OptiPass using the specified arguments.  Instantiates an OP object
    with paths to data files, calls methods that create the input file,
    run the optimizer, and gather the results.

    Arguments:
        barrier_path: name of directory with CSVs files for tide gate data
        target_file: name of a CSV file with restoration target descriptions
        mapping_file: name of CSV file with barrier passabilities
        regions: a list of geographic regions (river names) to use
        budgets: a list with starting budget, budget increment, and number of budgets
        targets: a list of IDs of targets to use
        weights: a list of target weights
        tmpdir: name of directory that has existing results (used for testing)

    Returns:
        a tuple containing two data frames, a budget table and a gate matrix
    '''
    op = OptiPass(barrier_path, target_file, mapping_file, regions, targets, weights, tmpdir)
    op.create_input_frame()
    op.create_paths()
    op.run(*budgets)
    return op.collect_results()

OptiPass

An instance of this class has all the data and methods required to respond to an optipass request.

The entry point that runs OptiPass has query parameters that specify how many budget levels to explore. We need to run OptiPass.exe once for each budget level, then collect the results.

The general workflow:

  • create an instance of this class, passing the constructor the parameter values for the regions, targets, and budget levels
  • call a method to generate the input file (called a "barrier file" in the OP documentation) that will be read as input each time OP runs
  • call the method that finds downstream barriers
  • call the method that runs OP
  • generate the output tables and plots

All of the intermediate data needed for these steps is saved in instance vars of the object.

Parameters:
  • barriers (str) –

    folder with barrier definitions

  • tfile (str) –

    name of file with target descriptions

  • mfile (str) –

    name of file with target benefits

  • rlist (list[str]) –

    list of region names

  • tlist (list[str]) –

    list of target names

  • weights (list[int] | None, default: None ) –

    list of target weights (optional)

  • tmpdir (str | None, default: None ) –

    path to output files (optional, used by unit tests)

Source code in app/optipass.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(self, 
        barriers: str, 
        tfile: str, 
        mfile: str, 
        rlist: list[str], 
        tlist: list[str], 
        weights: list[int] | None = None, 
        tmpdir: str | None = None):
    '''
    Instantiate a new OP object.

    Arguments:
      barriers: folder with barrier definitions
      tfile: name of file with target descriptions
      mfile: name of file with target benefits
      rlist: list of region names
      tlist: list of target names
      weights:  list of target weights (optional)
      tmpdir:  path to output files (optional, used by unit tests)
    '''

    bf = pd.read_csv(barriers/'barriers.csv')
    self.barriers = bf[bf.region.isin(rlist)]

    pf = pd.read_csv(barriers/'passability.csv')
    self.passability = pf[pf.ID.isin(self.barriers.ID)]

    tf = pd.read_csv(tfile).set_index('abbrev')
    assert all(t in tf.index for t in tlist), f'unknown target name in {tlist}'
    self.targets = tf[tf.index.isin(tlist)]

    mf = pd.read_csv(mfile).set_index('abbrev')
    self.mapping = mf[mf.index.isin(tlist)]

    self.set_target_weights(weights)

    self.tmpdir = Path(tmpdir) if tmpdir else None
    self.input_frame = None
    self.paths = None
    self.summary = None
    self.matrix = None

create_input_frame()

Build a data frame that has the rows that will be passed to OptiPass. This frame is basically a subset of the columns of the barrier frame, using column names defined in the targets frame. The frame is saved as an instance variable of this object.

Source code in app/optipass.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def create_input_frame(self):
    '''
    Build a data frame that has the rows that will be passed to OptiPass. This
    frame is basically a subset of the columns of the barrier frame, using column
    names defined in the targets frame.  The frame is saved as an instance variable
    of this object.
    '''

    # Initialize the output frame (df) with the ID and region columns 
    # from the data set 

    df = self.barriers[['ID','region']]
    header = ['ID','REG']

    # The FOCUS column is all 1's
    df = pd.concat([df, pd.Series(np.ones(len(self.barriers)), name='FOCUS', dtype=int, index=self.barriers.index)], axis=1)
    header.append('FOCUS')

    # Copy the downstream ID column
    df = pd.concat([df, self.barriers['DSID']], axis=1)
    header.append('DSID')

    # Add habitat column for each target.  The name of the column to copy is
    # in the mapping frame, the column to copy is in the passability frame
    for t in self.targets.index:
        col = self.passability[self.mapping.loc[t,'habitat']]
        df = pd.concat([df,col], axis=1)
        header.append('HAB_'+t)

    # Same, but for pre-mitigation passage values
    for t in self.targets.index:
        col = self.passability[self.mapping.loc[t,'prepass']]
        df = pd.concat([df,col], axis=1)
        header.append('PRE_'+t)

    # Copy the NPROJ column (1 if a gate is used, 0 if not)
    df = pd.concat([df, self.barriers['NPROJ']], axis=1)
    header.append('NPROJ')

    # The ACTION column is always all 0 (we consider only one scenario)
    df = pd.concat([df, pd.Series(np.zeros(len(self.barriers)), name='ACTION', dtype=int, index=self.barriers.index)], axis=1)
    header.append('ACTION')

    # Copy the cost to fix a gate
    df = pd.concat([df, self.barriers['cost']], axis=1)
    header += ['COST']

    # Same logic as above, copy the post-mitigation passage for each target
    for t in self.targets.index:
        col = self.passability[self.mapping.loc[t,'postpass']]
        df = pd.concat([df,col], axis=1)
        header.append('POST_'+t)

    # All done making the data -- use the new column headers and save the frame
    df.columns = header
    self.input_frame = df

create_paths()

Create paths downstream from each gate (the paths will be used to compute cumulative passability). The paths are saved in an instance variable.

Source code in app/optipass.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def create_paths(self):
    '''
    Create paths downstream from each gate (the paths will be 
    used to compute cumulative passability).  The paths are
    saved in an instance variable.
    '''
    df = self.input_frame

    G = nx.from_pandas_edgelist(
        df[df.DSID.notnull()], 
        source='ID', 
        target='DSID', 
        create_using=nx.DiGraph
    )

    for x in df[df.DSID.isnull()].ID:
        G.add_node(x)
    self.paths = { n: self.path_from(n,G) for n in G.nodes }

path_from(x, graph)

Helper function used to create paths -- return a list of nodes in the path from x to a downstream barrier that has no descendants.

Parameters:
  • x (str) –

    a barrier ID

  • graph (DiGraph) –

    a digraph based on downstream IDs

Returns:
  • list

    a list of all barriers downstream from x

Source code in app/optipass.py
201
202
203
204
205
206
207
208
209
210
211
212
213
def path_from(self, x: str, graph: nx.DiGraph) -> list:
    '''
    Helper function used to create paths -- return a list of nodes in the path 
    from `x` to a downstream barrier that has no descendants.

    Arguments:
      x: a barrier ID
      graph: a digraph based on downstream IDs

    Returns:
      a list of all barriers downstream from x
    '''
    return [x] + [child for _, child in nx.dfs_edges(graph,x)]

set_target_weights(weights)

Create the target weight values that will be passed on the command line when OptiPass is run. If the list is None set each weight to 1. Weights are saved in an instance variable.

Parameters:
  • weights (list[int] | None) –

    None if the user did not specify weighrs, otherwise the list of integer weights from the GUI

Source code in app/optipass.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def set_target_weights(self, weights: list[int] | None):
    '''
    Create the target weight values that will be passed on the command line when
    OptiPass is run.  If the list is None set each weight to 1.  Weights
    are saved in an instance variable.

    Arguments:
      weights:  None if the user did not specify weighrs, otherwise the list of integer weights from the GUI
    '''
    if weights:
        self.weights = weights
        self.weighted = True
    else:
        self.weights = [1] * len(self.targets)
        self.weighted = False

run(bmin, bdelta, bcount)

Run Optipass once for each budget level. Create the shell commands and run them. Outputs are saved in a temp directory.

Parameters:
  • bmin (int) –

    starting budget level

  • bdelta (int) –

    budget increment

  • bcount (int) –

    number of budgets

Note:

  • for unit tests the outputs are already in the temp directory so OptiPass isn't run
  • when running OptiPass run it once with a budget of $0 and then once for each budget level
Source code in app/optipass.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def run(self, bmin: int, bdelta: int, bcount: int):
    '''
    Run Optipass once for each budget level.  Create the shell commands and
    run them.  Outputs are saved in a temp directory.

    Arguments:
      bmin:  starting budget level
      bdelta:  budget increment
      bcount:  number of budgets

    Note:

    * for unit tests the outputs are already in the temp directory so OptiPass isn't run
    * when running OptiPass run it once with a budget of $0 and then once for each budget level
    '''
    if self.tmpdir is not None:
        logging.info(f'Using saved results in {self.tmpdir}')
        return

    if not optipass_is_installed():
        raise NotImplementedError('OptiPassMain.exe not found')

    self.tmpdir = Path(tempfile.mkdtemp(prefix='op', dir='tmp'))
    barrier_file = self.tmpdir / 'input.txt'
    self.input_frame.to_csv(barrier_file, index=False, sep='\t', lineterminator=os.linesep, na_rep='NA')

    template = 'bin\\OptiPassMain.exe -f {bf} -o {of} -b {n}'

    budget = bmin
    for i in range(bcount+1):
        outfile = self.tmpdir / f'output_{i}.txt'
        cmnd = template.format(bf=barrier_file, of=outfile, n=budget)
        if (num_targets := len(self.targets)) > 1:
            cmnd += ' -t {}'.format(num_targets)
            cmnd += ' -w ' + ', '.join([str(n) for n in self.weights])
        res = subprocess.run(cmnd, shell=True, capture_output=True)
        logging.info(cmnd)
        resp = res.stdout.decode()
        if re.search(r'error', resp, re.I):
            logging.error(f'OptiPassMain.exe: {resp}')
            raise RuntimeError(resp)
        budget += bdelta

    n = len(list(self.tmpdir.glob('output*.txt'))) 
    if n < bcount+1:
        raise RuntimeError(f'No output for {bcount-n} of {bcount} optimizations')

collect_results()

OptiPass makes one output file for each budget level. Iterate over those files to gather results into a pair of data frames.

Returns:
  • tuple

    a tuple with two data frames, one for budgets, the other for barriers

Source code in app/optipass.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def collect_results(self) -> tuple:
    '''
    OptiPass makes one output file for each budget level.  Iterate
    over those files to gather results into a pair of data frames. 

    Returns:
      a tuple with two data frames, one for budgets, the other for barriers 
    '''
    cols = { x: [] for x in ['budget', 'habitat', 'gates']}
    for fn in sorted(self.tmpdir.glob('output_*.txt'), key=lambda p: int(p.stem[7:])):
        self.parse_output(fn, cols)
    self.summary = pd.DataFrame(cols)

    dct = {}
    for i in range(len(self.summary)):
        b = int(self.summary.budget[i])
        dct[b] = [ 1 if g in self.summary.gates[i] else 0 for g in self.input_frame.ID]
    self.matrix = pd.DataFrame(dct, index=self.input_frame.ID)
    self.matrix['count'] = self.matrix.sum(axis=1)
    self.add_potential_habitat()

    return self.summary, self.matrix

parse_output(fn, dct)

Parse an output file, appending results to the lists in dct. We need to handle two different formats, depending on whether there was one target or more than one. Values extracted from a file are appended to the lists passed in the dictionary argument.

Parameters:
  • fn (str) –

    the name of the file to parse

  • dct (dict) –

    a dictionary containing lists for budget, habitat, and gate values

Source code in app/optipass.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def parse_output(self, fn: str, dct: dict):
    '''
    Parse an output file, appending results to the lists in dct.  We need to 
    handle two different formats, depending on whether there was one target 
    or more than one.  Values extracted from a file are appended to the lists 
    passed in the dictionary argument.

    Arguments:
      fn: the name of the file to parse
      dct: a dictionary containing lists for budget, habitat, and gate values
    '''

    def parse_header_line(line, tag):
        tokens = line.strip().split()
        if not tokens[0].startswith(tag):
            return None
        return tokens[1]

    logging.debug(f'parsing {fn}')
    with open(fn) as f:
        amount = parse_header_line(f.readline(), 'BUDGET')
        dct['budget'].append(float(amount))
        if parse_header_line(f.readline(), 'STATUS') == 'NO_SOLN':
            raise RuntimeError('No solution')
        f.readline()                        # skip OPTGAP
        line = f.readline()
        if line.startswith('PTNL'):
            # this file has only one target
            hab = parse_header_line(line, 'PTNL_HABITAT')
            dct['habitat'].append(float(hab))
            f.readline()                    # skip NETGAIN
        else:
            # multiple targets; skip past individual weights and targets
            while line := f.readline():
                if line.startswith('WT_PTNL_HAB'):
                    break
            hab = parse_header_line(line, 'WT_PTNL_HAB')
            dct['habitat'].append(float(hab))
            f.readline()                    # skip WT_NETGAIN
        f.readline()                        # skip blank line
        f.readline()                        # skip header
        lst = []
        while line := f.readline():
            name, action = line.strip().split()
            if action == '1':
                lst.append(name)
        dct['gates'].append(lst)

add_potential_habitat()

Compute the potential habitat available after restoration, using the original unscaled habitat values. Adds a new table named summary: one column for each target, showing the potential habitat gain at each budget level, then the weighted potential habitat over all targets, and finally the net gain.

Source code in app/optipass.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def add_potential_habitat(self):
    '''
    Compute the potential habitat available after restoration, using
    the original unscaled habitat values.  Adds a new table named summary:
    one column for each target, showing the potential habitat gain at each 
    budget level, then the weighted potential habitat over all targets, and
    finally the net gain.
    '''
    # make a copy of the passability data with NaN replaced by 0s and using the
    # barrier ID as the index
    df = self.passability.fillna(0).set_index('ID')
    wph = np.zeros(len(self.summary))
    for i in range(len(self.targets)):
        t = self.mapping.iloc[i]
        cp = self._ah(t, df) * self.weights[i]
        wph += cp
        col = pd.DataFrame({t.name: cp})
        self.summary = pd.concat([self.summary, col], axis=1)
        gain = self._gain(t.name, t, df)
        mcol = df[t.unscaled]
        mcol.name = t.name
        self.matrix = pd.concat([self.matrix, mcol, gain], axis=1)
    self.summary = pd.concat([self.summary, pd.DataFrame({'wph': wph})], axis = 1)
    self.summary['netgain'] = self.summary.habitat - self.summary.habitat[0]