Skip to content

Commit

Permalink
v1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
romeroqe committed Aug 4, 2020
0 parents commit 95c143a
Show file tree
Hide file tree
Showing 16 changed files with 1,274 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*.pyc
/dist/
/*.egg-info
*.DS_Store
395 changes: 395 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# cluster_qc
It's a package that filters profiles using a point-in-polygon algorithm, downloads source files, generates diagrams, and filters data in RTQC through cluster analysis.

## Installation

To install this package, first clone it on your computer or download the zip file. Then access to its root folder and install it with the command:

```
pip install .
```

## Demos

To see package demos go to the `demo` folder and run the files:

- demo1.py: Download the list of profiles and filter them using the point-in-polygon algorithm, with the polygon of the Exclusive Economic Zone of Mexico.
- demo2.py: Download the list of profiles, filter them using the point-in-polygon algorithm, download the source files of these profiles and extract their data from NetCDF files to CSV, with the polygon of a zone off Baja California Sur, Mexico.
- demo3.py: Plot six diagrams of the data from a hydrographic autonomous profiler.
- demo4.py: Perform group analysis on the data to filter the data in RTQC that contains the same patterns as the DMQC data.

The demos files must be executed in order. Downloading the profile source files may take some time, depending on your Internet connection.

## Argo data acknowledgment
These data were collected and made freely available by the International Argo Program and the national programs that contribute to it. ([http://www.argo.ucsd.edu](http://www.argo.ucsd.edu), [http://argo.jcommops.org](http://argo.jcommops.org)). The Argo Program is part of the Global Ocean Observing System.

## License

<a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by/4.0/88x31.png" /></a><br />This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>.
13 changes: 13 additions & 0 deletions cluster_qc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .data import download_data
from .data import filter_point_in_polygon
from .data import get_data_from_nc
from .data import get_data_from_source
from .data import get_index
from .data import get_profiles_within_polygon
from .data import is_inside_the_polygon

from .filters import cluster_analysis

from .plot import plot_DMQC_vs_RTQC
from .plot import plot_filtered_profiles_data
from .plot import plot_six_diagrams
124 changes: 124 additions & 0 deletions cluster_qc/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
import csv
import subprocess
import matplotlib.pyplot as plt

from math import ceil
from tqdm import tqdm
from pandas import read_csv
from netCDF4 import Dataset, num2date
from multiprocessing import cpu_count, Process

from .plot import plot_filtered_profiles_data

def download_data(files, storage_path):
# Download data from Argo rsync's server
with tqdm(total=files.shape[0]) as pbar:
for file in files:
subprocess.call(["rsync", "-azh", f"vdmzrs.ifremer.fr::argo/{file}", storage_path])
pbar.update(1)

def filter_point_in_polygon(data, start, end, polygon, thread, storage_path, file_name, source_path):
N = len(polygon)
with open(f'{storage_path}/{file_name}-{thread}.csv', 'a', newline='') as file:
writer = csv.writer(file)
for i in range(start, end):
# Point-in-polygon filter
if(is_inside_the_polygon(polygon, N, [data.latitude.values[i],data.longitude.values[i]])):
writer.writerow(data.values[i])

def get_data_from_nc(data, start, end, polygon, thread, storage_path, file_name, source_path):
with open(f'{storage_path}/{file_name}-{thread}.csv', 'a', newline='') as file:
writer = csv.writer(file)
measurements = []
for k in range(start, end):
try:
# Extract data from NetCDF files
nc = Dataset(f"{source_path}/{data.values[k]}")
PLATFORM_NUMBER = nc.variables['PLATFORM_NUMBER'][:]
CYCLE_NUMBER = nc.variables['CYCLE_NUMBER'][:]
DATA_MODE = nc.variables['DATA_MODE'][:]
JULD = nc.variables['JULD']
JULD = num2date(JULD[:],JULD.units)
LATITUDE = nc.variables['LATITUDE'][:]
LONGITUDE = nc.variables['LONGITUDE'][:]
PRES = nc.variables['PRES'][:]
PRES_ADJUSTED = nc.variables['PRES_ADJUSTED'][:]
TEMP = nc.variables['TEMP'][:]
TEMP_ADJUSTED = nc.variables['TEMP_ADJUSTED'][:]
PSAL = nc.variables['PSAL'][:]
PSAL_ADJUSTED = nc.variables['PSAL_ADJUSTED'][:]
for j in range(PRES_ADJUSTED.shape[1]):
if(str(DATA_MODE[0], 'utf-8').strip() == 'R'):
if(PRES[0][j] > 0 and TEMP[0][j] > 0 and PSAL[0][j] > 0):
measurements.append([str(PLATFORM_NUMBER[0], 'utf-8').strip(),CYCLE_NUMBER[0],str(DATA_MODE[0], 'utf-8').strip(),JULD[0],LATITUDE[0],LONGITUDE[0],PRES[0][j],TEMP[0][j],PSAL[0][j]])
else:
if(PRES_ADJUSTED[0][j] > 0 and TEMP_ADJUSTED[0][j] > 0 and PSAL_ADJUSTED[0][j] > 0):
measurements.append([str(PLATFORM_NUMBER[0], 'utf-8').strip(),CYCLE_NUMBER[0],str(DATA_MODE[0], 'utf-8').strip(),JULD[0],LATITUDE[0],LONGITUDE[0],PRES_ADJUSTED[0][j],TEMP_ADJUSTED[0][j],PSAL_ADJUSTED[0][j]])
except:
print(f"File [error]: {data.values[k]}")
writer.writerows(measurements)

def get_data_from_source(files, source_path, storage_path):
columns = ['PLATFORM_NUMBER','CYCLE_NUMBER','DATA_MODE','DATE','LATITUDE','LONGITUDE','PRES','TEMP','PSAL']
# Execute parallel computation with function "get_data_from_nc"
exec_parallel_computation(files, columns, get_data_from_nc, storage_path, "measurements", source_path=source_path)

def get_index(storage_path):
subprocess.call(["rsync", "-azh", "vdmzrs.ifremer.fr::argo-index/ar_index_global_prof.txt", storage_path])

def get_profiles_within_polygon(data, polygon, storage_path):
# Maximum and minimum filter
filtered_data = data[(data.latitude > polygon.latitude.min()) & (data.latitude < polygon.latitude.max()) & (data.longitude > polygon.longitude.min()) & (data.longitude < polygon.longitude.max())].reset_index()
# Execute parallel computation
exec_parallel_computation(filtered_data, filtered_data.columns, filter_point_in_polygon, storage_path, "filtered_profiles", polygon)
filtered_profiles = read_csv(f"{storage_path}/filtered_profiles.csv")
#Plot study area
plot_filtered_profiles_data(polygon, filtered_profiles, data, storage_path)
return filtered_profiles

def is_inside_the_polygon(polygon, N, p):
xinters = 0
counter = 0
p1 = polygon.iloc[0]
# Even-odd algorithm
for i in range(1, N+1):
p2 = polygon.iloc[i % N]
if (p[0] > min(p1[0],p2[0])):
if (p[0] <= max(p1[0],p2[0])):
if (p[1] <= max(p1[1],p2[1])):
if (p1[0] != p2[0]):
xinters = (p[0]-p1[0])*(p2[1]-p1[1])/(p2[0]-p1[0])+p1[1]
if (p1[1] == p2[1] or p[1] <= xinters):
counter += 1
p1 = p2
return counter % 2 != 0

def exec_parallel_computation(data, columns, function, storage_path, file_name, polygon=[], source_path=""):
# Get number of CPUs in the system
processes = []
cpucount = cpu_count()
r_range = ceil(data.shape[0]/cpucount)
# Parallel computation
for i in range(cpucount):
with open(f"{storage_path}/{file_name}-{i}.csv", 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(columns)
start = i * r_range
end = start + r_range
if(end > data.shape[0]):
end = data.shape[0]
p = Process(target=function, args=(data, start, end, polygon, i, storage_path, file_name, source_path))
processes.append(p)
p.start()
# Block threads until the process join() method is called
for p in processes:
p.join()
# Collect parallel compute data
filtered_profiles_path = f"{storage_path}/{file_name}.csv"
with open(filtered_profiles_path, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(columns)
for i in range(cpucount):
writer.writerows(read_csv(f"{storage_path}/{file_name}-{i}.csv").values)
os.remove(f"{storage_path}/{file_name}-{i}.csv")
85 changes: 85 additions & 0 deletions cluster_qc/filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import matplotlib.pyplot as plt

from pandas import DataFrame
from datetime import datetime
from scipy import interpolate
from numpy import array, unique
from sklearn.cluster import k_means

from .plot import plot_DMQC_vs_RTQC

months = ["01","02","03","04","05","06","07","08","09","10","11","12"]

def cluster_analysis(data, storage_path, depth):
data["PRES"] = -data.PRES
xmin = data.PSAL.min()
xmax = data.PSAL.max()
ymin = data.TEMP.min()
ymax = data.TEMP.max()
# Plot original data
plot_DMQC_vs_RTQC(data, xmin, xmax, ymin, ymax, storage_path, "original", "Original")

profilers = []
filtered_measurements = DataFrame([], columns=data.columns)
# Evaluate every month
for month in months:
temp_data = month_data = data[data.MONTH == month]
if month_data[month_data.DATA_MODE != "D"].shape[0] > 0:
flag = True
# Cluster analysis up to a maximum of 10 iterations
for _ in range(1,10):
if(flag):
# kmeans() returns a flag that can stop the loop, original data, analyzed data and platform numbers that have had salinity drifts
flag, original, temp_data, platform_numbers = kmeans(temp_data, depth)
if(flag):
# If the loop continues, add the platform numbers to the profiler list
profilers = profilers + platform_numbers
else: break
else: original = month_data
# Join data
rtqc = original[original.DATA_MODE != "D"]
dmqc = month_data[month_data.DATA_MODE == "D"]
filtered_measurements = filtered_measurements.append(rtqc)
filtered_measurements = filtered_measurements.append(dmqc)

filtered_measurements.insert(filtered_measurements.shape[1], "CLUSTER_QC", [2] * filtered_measurements.shape[0])
profilers = unique(profilers)

# Set CLUSTER_QC values
def cluster_qc(row):
if row["DATA_MODE"] == "D":
return 1
if not row["PLATFORM_NUMBER"] in profilers:
return 1
return 2

filtered_measurements["CLUSTER_QC"] = filtered_measurements.apply(cluster_qc, axis=1)
filtered_measurements.to_csv(f"{storage_path}/filtered_measurements.csv")
# Plot CLUSTER_QC = 2 data
plot_DMQC_vs_RTQC(filtered_measurements, xmin, xmax, ymin, ymax, storage_path, "cluster_qc_2", "CLUSTER_QC = 2")
# Plot CLUSTER_QC = 1 data
plot_DMQC_vs_RTQC(filtered_measurements[filtered_measurements.CLUSTER_QC == 1], xmin, xmax, ymin, ymax, storage_path, "cluster_qc_1", "CLUSTER_QC = 1")
return(filtered_measurements)

def kmeans(data, depth):
original = data
data = data[data.PRES < depth]
if data[data.DATA_MODE != "D"].shape[0] > 0 and data[data.DATA_MODE == "D"].shape[0] > 0:
# Get mid-range of DMQC and RTQC data
rm_d = (data[data.DATA_MODE == "D"].PSAL.max() + data[data.DATA_MODE == "D"].PSAL.min()) / 2
rm_r = (data[data.DATA_MODE != "D"].PSAL.max() + data[data.DATA_MODE != "D"].PSAL.min()) / 2
# K means algorithm
init = array([[rm_d],[rm_r]])
centroid, labels, loss = k_means(data[["PSAL"]], 2, init=init, n_init=1)
else: return(False, original, [], [])

data.insert(data.shape[1], "LABEL", labels)
# Get data from the second group
gruped = data[data.LABEL == 1].groupby(["PLATFORM_NUMBER","CYCLE_NUMBER"]).size().reset_index()
platform_numbers = gruped.PLATFORM_NUMBER.unique().tolist()
temp_data = original[(original.PLATFORM_NUMBER.isin(gruped.PLATFORM_NUMBER)) & (original.CYCLE_NUMBER.isin(gruped.CYCLE_NUMBER))]
# If the second group has DMQC data, the flag is False
flag = False if temp_data[temp_data.DATA_MODE == "D"].shape[0] > 0 else True
# Get data from the first group
temp_data = original[~((original.PLATFORM_NUMBER.isin(gruped.PLATFORM_NUMBER)) & (original.CYCLE_NUMBER.isin(gruped.CYCLE_NUMBER)))]
return(flag, original, temp_data, platform_numbers)
Loading

0 comments on commit 95c143a

Please sign in to comment.